You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:32 UTC

[16/50] [abbrv] samza git commit: SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
deleted file mode 100644
index 958d07c..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package old.checkpoint
-
-import java.util
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-
-import scala.collection.JavaConversions._
-
-/**
- * Kafka Checkpoint Log-specific key used to identify what type of entry is
- * written for any particular log entry.
- *
- * @param map Backing map to hold key values
- */
-class KafkaCheckpointLogKey private (val map: Map[String, String]) {
-  // This might be better as a case class...
-  import KafkaCheckpointLogKey._
-
-  /**
-   * Serialize this key to bytes
-   * @return Key as bytes
-   */
-  def toBytes(): Array[Byte] = {
-    val jMap = new util.HashMap[String, String](map.size)
-    jMap.putAll(map)
-
-    JSON_MAPPER.writeValueAsBytes(jMap)
-  }
-
-  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log key"))
-
-  /**
-   * Is this key for a checkpoint entry?
-   *
-   * @return true iff this key's entry is for a checkpoint
-   */
-  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
-
-  /**
-   * Is this key for a changelog partition mapping?
-   *
-   * @return true iff this key's entry is for a changelog partition mapping
-   */
-  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
-
-  /**
-   * If this Key is for a checkpoint entry, return its associated TaskName.
-   *
-   * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
-   */
-  def getCheckpointTaskName = {
-    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
-    new TaskName(asString)
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: KafkaCheckpointLogKey =>
-      (that canEqual this) &&
-        map == that.map
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    val state = Seq(map)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}
-
-object KafkaCheckpointLogKey {
-  /**
-   *  Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
-   *  type, either a checkpoint or a changelog-partition-mapping.
-   */
-  val CHECKPOINT_KEY_KEY = "type"
-  val CHECKPOINT_KEY_TYPE = "checkpoint"
-  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
-  val CHECKPOINT_TASKNAME_KEY = "taskName"
-  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
-
-  /**
-   * Partition mapping keys have no dynamic values, so we just need one instance.
-   */
-  val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
-
-  private val JSON_MAPPER = new ObjectMapper()
-  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
-
-  var systemStreamPartitionGrouperFactoryString:Option[String] = None
-
-  /**
-   * Set the name of the factory configured to provide the SystemStreamPartition grouping
-   * so it be included in the key.
-   *
-   * @param str Config value of SystemStreamPartition Grouper Factory
-   */
-  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
-    systemStreamPartitionGrouperFactoryString = Some(str)
-  }
-
-  /**
-   * Get the name of the factory configured to provide the SystemStreamPartition grouping
-   * so it be included in the key
-   */
-  def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
-
-  /**
-   * Build a key for a a checkpoint log entry for a particular TaskName
-   * @param taskName TaskName to build for this checkpoint entry
-   *
-   * @return Key for checkpoint log entry
-   */
-  def getCheckpointKey(taskName:TaskName) = {
-    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
-      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
-      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
-
-    new KafkaCheckpointLogKey(map)
-  }
-
-  /**
-   * Build a key for a changelog partition mapping entry
-   *
-   * @return Key for changelog partition mapping entry
-   */
-  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
-
-  /**
-   * Deserialize a Kafka checkpoint log key
-   * @param bytes Serialized (via JSON) Kafka checkpoint log key
-   * @return Checkpoint log key
-   */
-  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
-    try {
-      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
-
-      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
-        throw new SamzaException("No type entry in checkpoint key: " + jmap)
-      }
-
-      // Only checkpoint keys have ssp grouper factory keys
-      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
-        val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
-
-        if (sspGrouperFactory == null) {
-          throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
-        }
-
-        if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
-          throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
-        }
-      }
-
-      new KafkaCheckpointLogKey(jmap.toMap)
-    } catch {
-      case e: Exception =>
-        throw new SamzaException("Exception while deserializing checkpoint key", e)
-    }
-  }
-}
-
-class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
-  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
-    ") does not match value from current configuration (" + inConfig + ").  " +
-    "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
deleted file mode 100644
index 627631a..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.api._
-import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, TopicExistsException, UnknownTopicOrPartitionException}
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.container.TaskName
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.{ExponentialSleepStrategy, KafkaUtil, Logging, TopicMetadataStore}
-
-import scala.collection.mutable
-
-/**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
- * To read a checkpoint for a specific taskName, we find the newest message
- * keyed to that taskName. If there is no such message, no checkpoint data
- * exists.  The underlying log has a single partition into which all
- * checkpoints and TaskName to changelog partition mappings are written.
- */
-class KafkaCheckpointManager(clientId: String,
-                              checkpointTopic: String,
-                              systemName: String,
-                              socketTimeout: Int,
-                              bufferSize: Int,
-                              fetchSize: Int,
-                              metadataStore: TopicMetadataStore,
-                              connectProducer: () => Producer[Array[Byte], Array[Byte]],
-                              connectZk: () => ZkClient,
-                              systemStreamPartitionGrouperFactoryString: String,
-                              retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-                              serde: CheckpointSerde = new CheckpointSerde,
-                              checkpointTopicProperties: Properties = new Properties) extends Logging {
-  import KafkaCheckpointManager._
-
-  var taskNames = Set[TaskName]()
-  var producer: Producer[Array[Byte], Array[Byte]] = null
-  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
-
-  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
-
-  KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
-
-  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
-
-  private def getConsumer(): SimpleConsumer = {
-    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-    val metadata = metadataMap(checkpointTopic)
-    val partitionMetadata = metadata.partitionsMetadata
-      .filter(_.partitionId == 0)
-      .headOption
-      .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
-    val leader = partitionMetadata
-      .leader
-      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
-    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
-    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
-  }
-
-  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
-  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
-    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
-      .partitionErrorAndOffsets
-      .get(topicAndPartition)
-      .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
-    // Fail or retry if there was an an issue with the offset request.
-    KafkaUtil.maybeThrowException(offsetResponse.error)
-
-    val offset: Long = offsetResponse
-      .offsets
-      .headOption
-      .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
-    offset
-  }
-
-  /**
-   * Read the last checkpoint for specified TaskName
-   *
-   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   */
-  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
-    if (!taskNames.contains(taskName)) {
-      throw new SamzaException(taskName + " not registered with this CheckpointManager")
-    }
-
-    info("Reading checkpoint for taskName " + taskName)
-
-    if (taskNamesToOffsets == null) {
-      info("No TaskName to checkpoint mapping provided.  Reading for first time.")
-      taskNamesToOffsets = readCheckpointsFromLog()
-    } else {
-      info("Already existing checkpoint mapping.  Merging new offsets")
-      taskNamesToOffsets ++= readCheckpointsFromLog()
-    }
-
-    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
-
-    info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
-
-    checkpoint
-  }
-
-  /**
-   * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
-   */
-  def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
-    val checkpoints = mutable.Map[TaskName, Checkpoint]()
-
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
-
-    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
-      val taskName = checkpointKey.getCheckpointTaskName
-      val checkpoint = serde.fromBytes(Utils.readBytes(payload))
-      debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
-      checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
-    }
-
-    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
-
-    checkpoints.toMap /* of the immutable kind */
-  }
-
-  /**
-   * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
-   *
-   * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
-   * checkpoint log reading
-   */
-  def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
-    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
-
-    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
-      changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
-
-      debug("Adding changelog partition mapping" + changelogPartitionMapping)
-    }
-
-    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
-
-    changelogPartitionMapping
-  }
-
-  /**
-   * Common code for reading both changelog partition mapping and change log
-   *
-   * @param entryType What type of entry to look for within the log key's
-   * @param handleEntry Code to handle an entry in the log once it's found
-   */
-  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
-                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
-    retryBackoff.run[Unit](
-      loop => {
-        val consumer = getConsumer()
-
-        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
-
-        try {
-          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
-          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
-          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
-          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
-          if (offset < 0) {
-            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
-            return
-          }
-
-          while (offset < latestOffset) {
-            val request = new FetchRequestBuilder()
-              .addFetch(checkpointTopic, 0, offset, fetchSize)
-              .maxWait(500)
-              .minBytes(1)
-              .clientId(clientId)
-              .build
-
-            val fetchResponse = consumer.fetch(request)
-            if (fetchResponse.hasError) {
-              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
-              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
-              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
-                return
-              }
-              KafkaUtil.maybeThrowException(errorCode)
-            }
-
-            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
-              offset = response.nextOffset
-              startingOffset = Some(offset) // For next time we call
-
-              if (!response.message.hasKey) {
-                throw new KafkaCheckpointException("Encountered message without key.")
-              }
-
-              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
-
-              if (!shouldHandleEntry(checkpointKey)) {
-                debug("Skipping " + entryType + " entry with key " + checkpointKey)
-              } else {
-                handleEntry(response.message.payload, checkpointKey)
-              }
-            }
-          }
-        } finally {
-          consumer.close()
-        }
-
-        loop.done
-        Unit
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: KafkaCheckpointException => throw e
-          case e: Exception =>
-            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
-            debug("Exception detail:", e)
-        }
-      }
-    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
-  }
-
-  def topicExists: Boolean = {
-    val zkClient = connectZk()
-    try {
-      AdminUtils.topicExists(zkClient, checkpointTopic)
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def start {
-    if (topicExists) {
-      validateTopic
-    } else {
-      throw new SamzaException("Failed to start KafkaCheckpointManager for non-existing checkpoint topic. KafkaCheckpointManager should only be used for migration purpose.")
-    }
-  }
-
-  def register(taskName: TaskName) {
-    debug("Adding taskName " + taskName + " to " + this)
-    taskNames += taskName
-  }
-
-  def stop = {
-    if (producer != null) {
-      producer.close
-    }
-  }
-
-  def validateTopic =  {
-    info("Validating checkpoint topic %s." format checkpointTopic)
-    retryBackoff.run(
-      loop => {
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(checkpointTopic)
-        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
-        val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount != 1) {
-          throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
-        }
-
-        info("Successfully validated checkpoint topic %s." format checkpointTopic)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: KafkaCheckpointException => throw e
-          case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
-}
-
-object KafkaCheckpointManager {
-  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
-  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}
-
-/**
- * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
- * one to signal a hard failure, and the other to retry. The
- * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
- * CheckpointManager can't recover from.
- */
-class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
-  def this(s: String) = this(s, null)
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
deleted file mode 100644
index 189752a..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import java.util.Properties
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
-
-object KafkaCheckpointManagerFactory {
-  /**
-   * Version number to track the format of the checkpoint log
-   */
-  val CHECKPOINT_LOG_VERSION_NUMBER = 1
-
-  val INJECTED_PRODUCER_PROPERTIES = Map(
-    "acks" -> "all",
-    // Forcibly disable compression because Kafka doesn't support compression
-    // on log compacted topics. Details in SAMZA-586.
-    "compression.type" -> "none")
-
-  // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there
-  // are fewer useless (overwritten) messages to read from the checkpoint
-  // topic.
-  def getCheckpointTopicProperties(config: Config) = {
-    val segmentBytes = "26214400"
-
-    (new Properties /: Map(
-      "cleanup.policy" -> "compact",
-      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
-  }
-}
-
-class KafkaCheckpointManagerFactory extends Logging {
-  import KafkaCheckpointManagerFactory._
-
-  def getCheckpointManager(config: Config, registry: MetricsRegistry): KafkaCheckpointManager = {
-    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
-    val systemName = Option(config.get("task.checkpoint.system")).getOrElse(throw new SamzaException("no system defined for checkpoint manager, cannot perform migration."))
-    val producerConfig = config.getKafkaSystemProducerConfig(
-      systemName,
-      clientId,
-      INJECTED_PRODUCER_PROPERTIES)
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
-    val socketTimeout = consumerConfig.socketTimeoutMs
-    val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
-
-    val connectProducer = () => {
-      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
-    }
-    val zkConnect = Option(consumerConfig.zkConnect)
-      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
-    val connectZk = () => {
-      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    }
-    val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
-    val jobId = config.getJobId.getOrElse("1")
-    val bootstrapServers = producerConfig.bootsrapServers
-    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
-    val checkpointTopic = getTopic(jobName, jobId)
-
-    // Find out the SSPGrouperFactory class so it can be included/verified in the key
-    val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
-
-    new KafkaCheckpointManager(
-      clientId,
-      checkpointTopic,
-      systemName,
-      socketTimeout,
-      bufferSize,
-      fetchSize,
-      metadataStore,
-      connectProducer,
-      connectZk,
-      systemStreamPartitionGrouperFactoryString,
-      checkpointTopicProperties = getCheckpointTopicProperties(config))
-  }
-
-  private def getTopic(jobName: String, jobId: String) =
-    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
deleted file mode 100644
index 32afe4c..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.config.Config
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.JobCoordinator
-import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.migration.MigrationPlan
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util.{Logging, NoOpMetricsRegistry}
-import scala.collection.JavaConverters._
-
-class KafkaCheckpointMigration extends MigrationPlan with Logging {
-  val source = "CHECKPOINTMIGRATION"
-  val migrationKey = "CheckpointMigration09to10"
-  val migrationVal = "true"
-
-  def migrate(config: Config, getManager:() => KafkaCheckpointManager): Unit = {
-    val coordinatorFactory = new CoordinatorStreamSystemFactory
-    val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-    var manager = getManager()
-    // make sure to validate that we only perform migration when the checkpoint topic exists
-    if (manager.topicExists) {
-      manager.validateTopic
-      val checkpointMap = manager.readCheckpointsFromLog()
-      manager.stop
-
-      manager = getManager()
-      val changelogMap = manager.readChangeLogPartitionMapping()
-      manager.stop
-
-      val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-      if (migrationVerification(coordinatorSystemConsumer)) {
-        info("Migration %s was already performed, doing nothing" format migrationKey)
-        return
-      }
-      info("No previous migration for %s were detected, performing migration" format migrationKey)
-      val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
-      checkpointManager.start()
-      checkpointMap.foreach { case (taskName: TaskName, checkpoint: Checkpoint) => checkpointManager.writeCheckpoint(taskName, checkpoint) }
-      val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
-      changelogPartitionManager.start()
-      changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
-      changelogPartitionManager.stop()
-      checkpointManager.stop()
-    }
-    migrationCompletionMark(coordinatorSystemProducer)
-  }
-
-  override def migrate(config: Config) {
-    val factory = new KafkaCheckpointManagerFactory
-    def getManager() = factory.getCheckpointManager(config, new NoOpMetricsRegistry)
-    migrate(config, getManager)
-  }
-
-  def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
-    coordinatorSystemConsumer.register()
-    coordinatorSystemConsumer.start()
-    coordinatorSystemConsumer.bootstrap()
-    val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
-    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
-    stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
-  }
-
-  def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
-    info("Marking completion of migration %s" format migrationKey)
-    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
-    coordinatorSystemProducer.start()
-    coordinatorSystemProducer.send(message)
-    coordinatorSystemProducer.stop()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..ea8462d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import java.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+  // This might be better as a case class...
+  import org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey._
+
+  /**
+   * Serialize this key to bytes
+   * @return Key as bytes
+   */
+  def toBytes(): Array[Byte] = {
+    val jMap = new util.HashMap[String, String](map.size)
+    jMap.putAll(map)
+
+    JSON_MAPPER.writeValueAsBytes(jMap)
+  }
+
+  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log key"))
+
+  /**
+   * Is this key for a checkpoint entry?
+   *
+   * @return true iff this key's entry is for a checkpoint
+   */
+  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+  /**
+   * Is this key for a changelog partition mapping?
+   *
+   * @return true iff this key's entry is for a changelog partition mapping
+   */
+  @Deprecated
+  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+  /**
+   * If this Key is for a checkpoint entry, return its associated TaskName.
+   *
+   * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+   */
+  def getCheckpointTaskName = {
+    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+    new TaskName(asString)
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: KafkaCheckpointLogKey =>
+      (that canEqual this) &&
+        map == that.map
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(map)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
+
+object KafkaCheckpointLogKey {
+  /**
+   *  Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+   *  type, either a checkpoint or a changelog-partition-mapping.
+   */
+  val CHECKPOINT_KEY_KEY = "type"
+  val CHECKPOINT_KEY_TYPE = "checkpoint"
+
+  @Deprecated
+  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+
+  val CHECKPOINT_TASKNAME_KEY = "taskName"
+  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+  /**
+   * Partition mapping keys have no dynamic values, so we just need one instance.
+   */
+  @Deprecated
+  val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+  private val JSON_MAPPER = new ObjectMapper()
+  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+  var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+  /**
+   * Set the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key.
+   *
+   * @param str Config value of SystemStreamPartition Grouper Factory
+   */
+  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+    systemStreamPartitionGrouperFactoryString = Some(str)
+  }
+
+  /**
+   * Get the name of the factory configured to provide the SystemStreamPartition grouping
+   * so it be included in the key
+   */
+  def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+  /**
+   * Build a key for a a checkpoint log entry for a particular TaskName
+   * @param taskName TaskName to build for this checkpoint entry
+   *
+   * @return Key for checkpoint log entry
+   */
+  def getCheckpointKey(taskName:TaskName) = {
+    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+    new KafkaCheckpointLogKey(map)
+  }
+
+  /**
+   * Build a key for a changelog partition mapping entry
+   *
+   * @return Key for changelog partition mapping entry
+   */
+  @Deprecated
+  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+  /**
+   * Deserialize a Kafka checkpoint log key
+   * @param bytes Serialized (via JSON) Kafka checkpoint log key
+   * @return Checkpoint log key
+   */
+  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+    try {
+      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+        throw new SamzaException("No type entry in checkpoint key: " + jmap)
+      }
+
+      // Only checkpoint keys have ssp grouper factory keys
+      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+        val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+        if (sspGrouperFactory == null) {
+          throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+        }
+
+        if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+          throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+        }
+      }
+
+      new KafkaCheckpointLogKey(jmap.toMap)
+    } catch {
+      case e: Exception =>
+        throw new SamzaException("Exception while deserializing checkpoint key", e)
+    }
+  }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+    ") does not match value from current configuration (" + inConfig + ").  " +
+    "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..787de1f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Properties
+
+import kafka.api._
+import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
+import kafka.utils.Utils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.container.TaskName
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util._
+
+import scala.collection.mutable
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists.  The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
+ */
+class KafkaCheckpointManager(
+                              clientId: String,
+                              checkpointTopic: String,
+                              val systemName: String,
+                              replicationFactor: Int,
+                              socketTimeout: Int,
+                              bufferSize: Int,
+                              fetchSize: Int,
+                              val metadataStore: TopicMetadataStore,
+                              connectProducer: () => Producer[Array[Byte], Array[Byte]],
+                              val connectZk: () => ZkClient,
+                              systemStreamPartitionGrouperFactoryString: String,
+                              val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+                              serde: CheckpointSerde = new CheckpointSerde,
+                              checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
+  import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._
+
+  var taskNames = Set[TaskName]()
+  var producer: Producer[Array[Byte], Array[Byte]] = null
+  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+
+  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
+  val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
+
+  KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
+
+  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+  /**
+   * Write Checkpoint for specified taskName to log
+   *
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
+   * @param checkpoint Reference to a Checkpoint object to store offset data in.
+   **/
+  override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+    val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
+    val keyBytes = key.toBytes()
+    val msgBytes = serde.toBytes(checkpoint)
+    retryBackoff.run(
+      loop => {
+        if (producer == null) {
+          producer = connectProducer()
+        }
+
+        producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get()
+        loop.done
+      },
+
+      (exception, loop) => {
+        warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception))
+        debug("Exception detail:", exception)
+        if (producer != null) {
+          producer.close
+        }
+        producer = null
+      }
+    )
+  }
+
+  private def getConsumer(): SimpleConsumer = {
+    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+    val metadata = metadataMap(checkpointTopic)
+    val partitionMetadata = metadata.partitionsMetadata
+      .filter(_.partitionId == 0)
+      .headOption
+      .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+    val leader = partitionMetadata
+      .leader
+      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+  }
+
+  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+      .partitionErrorAndOffsets
+      .get(topicAndPartition)
+      .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic))
+    // Fail or retry if there was an an issue with the offset request.
+    KafkaUtil.maybeThrowException(offsetResponse.error)
+
+    val offset: Long = offsetResponse
+      .offsets
+      .headOption
+      .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+    offset
+  }
+
+  /**
+   * Read the last checkpoint for specified TaskName
+   *
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   **/
+  override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException(taskName + " not registered with this CheckpointManager")
+    }
+
+    info("Reading checkpoint for taskName " + taskName)
+
+    if (taskNamesToOffsets == null) {
+      info("No TaskName to checkpoint mapping provided.  Reading for first time.")
+      taskNamesToOffsets = readCheckpointsFromLog()
+    } else {
+      info("Already existing checkpoint mapping.  Merging new offsets")
+      taskNamesToOffsets ++= readCheckpointsFromLog()
+    }
+
+    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+    info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+    checkpoint
+  }
+
+  /**
+   * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+   */
+  def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+    val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      val taskName = checkpointKey.getCheckpointTaskName
+      val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+      debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+      checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+    }
+
+    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+    checkpoints.toMap /* of the immutable kind */
+  }
+
+
+  /**
+   * Common code for reading both changelog partition mapping and change log
+   *
+   * @param entryType What type of entry to look for within the log key's
+   * @param handleEntry Code to handle an entry in the log once it's found
+   */
+  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+    retryBackoff.run[Unit](
+      loop => {
+        val consumer = getConsumer()
+
+        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
+        try {
+          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
+
+          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
+
+          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
+
+          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
+
+          if (offset < 0) {
+            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+            return
+          }
+
+          while (offset < latestOffset) {
+            val request = new FetchRequestBuilder()
+              .addFetch(checkpointTopic, 0, offset, fetchSize)
+              .maxWait(500)
+              .minBytes(1)
+              .clientId(clientId)
+              .build
+
+            val fetchResponse = consumer.fetch(request)
+            if (fetchResponse.hasError) {
+              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+                return
+              }
+              KafkaUtil.maybeThrowException(errorCode)
+            }
+
+            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+              offset = response.nextOffset
+              startingOffset = Some(offset) // For next time we call
+
+              if (!response.message.hasKey) {
+                throw new KafkaUtilException("Encountered message without key.")
+              }
+
+              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+              if (!shouldHandleEntry(checkpointKey)) {
+                debug("Skipping " + entryType + " entry with key " + checkpointKey)
+              } else {
+                handleEntry(response.message.payload, checkpointKey)
+              }
+            }
+          }
+        } finally {
+          consumer.close()
+        }
+
+        loop.done
+        Unit
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: KafkaUtilException => throw e
+          case e: Exception =>
+            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
+            debug("Exception detail:", e)
+        }
+      }
+    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
+
+  }
+
+  def start {
+    kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties)
+    kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1)
+  }
+
+  def register(taskName: TaskName) {
+    debug("Adding taskName " + taskName + " to " + this)
+    taskNames += taskName
+  }
+
+  def stop = {
+    if (producer != null) {
+      producer.close
+    }
+  }
+
+
+  /**
+   * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+   * To be used for Migration purpose only. In newer version, changelogPartitionMapping will be handled through coordinator stream
+   */
+  @Deprecated
+  def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+      changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+      debug("Adding changelog partition mapping" + changelogPartitionMapping)
+    }
+
+    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+    changelogPartitionMapping
+  }
+
+  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..7db8940
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import java.util.Properties
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.{Config, KafkaConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+
+object KafkaCheckpointManagerFactory {
+  val INJECTED_PRODUCER_PROPERTIES = Map(
+    "acks" -> "all",
+    // Forcibly disable compression because Kafka doesn't support compression
+    // on log compacted topics. Details in SAMZA-586.
+    "compression.type" -> "none")
+
+  // Set the checkpoint topic configs to have a very small segment size and
+  // enable log compaction. This keeps job startup time small since there 
+  // are fewer useless (overwritten) messages to read from the checkpoint 
+  // topic.
+  def getCheckpointTopicProperties(config: Config) = {
+    val segmentBytes: Int = if (config == null) {
+      KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
+    } else {
+      config.getCheckpointSegmentBytes()
+    }
+    (new Properties /: Map(
+      "cleanup.policy" -> "compact",
+      "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
+  }
+}
+
+class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+  import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory._
+
+  def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
+    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+    val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+    val jobId = config.getJobId.getOrElse("1")
+
+    val systemName = config
+      .getCheckpointSystem
+      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+
+    val producerConfig = config.getKafkaSystemProducerConfig(
+      systemName,
+      clientId,
+      INJECTED_PRODUCER_PROPERTIES)
+    val connectProducer = () => {
+      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+    }
+
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    val connectZk = () => {
+      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    }
+    val socketTimeout = consumerConfig.socketTimeoutMs
+
+
+    new KafkaCheckpointManager(
+      clientId,
+      KafkaUtil.getCheckpointTopic(jobName, jobId),
+      systemName,
+      config.getCheckpointReplicationFactor.getOrElse("3").toInt,
+      socketTimeout,
+      consumerConfig.socketReceiveBufferBytes,
+      consumerConfig.fetchMessageMaxBytes,            // must be > buffer size
+      new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout),
+      connectProducer,
+      connectZk,
+      config.getSystemStreamPartitionGrouperFactory,      // To find out the SSPGrouperFactory class so it can be included/verified in the key
+      checkpointTopicProperties = getCheckpointTopicProperties(config))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 798033c..a65e8e8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,6 +40,10 @@ object KafkaConfig {
   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
+  val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
   val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
   // The default segment size to use for changelog topics
@@ -54,10 +58,16 @@ object KafkaConfig {
    */
   val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
 
+  val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
+
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }
 
 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+  // checkpoints
+  def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+  def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+  def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
new file mode 100644
index 0000000..c6b1fe4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.migration
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory}
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.util._
+
+/**
+ * Migrates changelog partition mapping from checkpoint topic to coordinator stream
+ */
+class KafkaCheckpointMigration extends MigrationPlan with Logging {
+  val source = "CHECKPOINTMIGRATION"
+  val migrationKey = "CheckpointMigration09to10"
+  val migrationVal = "true"
+
+  var connectZk: () => ZkClient = null
+
+  private def getCheckpointSystemName(config: Config): String = {
+    config
+      .getCheckpointSystem
+      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+  }
+
+  private def getClientId(config: Config): String = {
+    KafkaUtil.getClientId("samza-checkpoint-manager", config)
+  }
+
+  private def getTopicMetadataStore(config: Config): TopicMetadataStore = {
+    val clientId = getClientId(config)
+    val systemName = getCheckpointSystemName(config)
+
+    val producerConfig = config.getKafkaSystemProducerConfig(
+      systemName,
+      clientId,
+      KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+
+    val consumerConfig = config.getKafkaSystemConsumerConfig(
+      systemName,
+      clientId)
+
+    new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs)
+  }
+
+  private def getConnectZk(config: Config): () => ZkClient = {
+    val clientId = getClientId(config)
+
+    val checkpointSystemName = getCheckpointSystemName(config)
+
+    val consumerConfig = config.getKafkaSystemConsumerConfig(
+      checkpointSystemName,
+      clientId)
+
+    val zkConnectString = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    () => {
+      new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer)
+    }
+  }
+
+  override def migrate(config: Config) {
+    val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration."))
+    val jobId = config.getJobId.getOrElse("1")
+
+    val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId)
+
+    val coordinatorSystemFactory = new CoordinatorStreamSystemFactory
+    val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+    val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
+
+    val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config))
+
+    // make sure to validate that we only perform migration when checkpoint topic exists
+    if (kafkaUtil.topicExists(checkpointTopicName)) {
+      kafkaUtil.validateTopicPartitionCount(
+        checkpointTopicName,
+        getCheckpointSystemName(config),
+        getTopicMetadataStore(config),
+        1)
+
+      if (migrationVerification(coordinatorSystemConsumer)) {
+        info("Migration %s was already performed, doing nothing" format migrationKey)
+        return
+      }
+
+      info("No previous migration for %s were detected, performing migration" format migrationKey)
+
+      info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName)
+      val changelogMap = checkpointManager.readChangeLogPartitionMapping()
+      checkpointManager.stop
+
+      info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId))
+      val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+      changelogPartitionManager.start()
+      changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
+      changelogPartitionManager.stop()
+    }
+    migrationCompletionMark(coordinatorSystemProducer)
+
+  }
+
+  def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
+    coordinatorSystemConsumer.register()
+    coordinatorSystemConsumer.start()
+    coordinatorSystemConsumer.bootstrap()
+    val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
+    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+    stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
+  }
+
+  def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
+    info("Marking completion of migration %s" format migrationKey)
+    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+    coordinatorSystemProducer.start()
+    coordinatorSystemProducer.send(message)
+    coordinatorSystemProducer.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index a7a095b..f4311d1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,16 +19,24 @@
 
 package org.apache.samza.util
 
+import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.kafka.common.PartitionInfo
 import org.apache.samza.config.Config
 import org.apache.samza.config.ConfigException
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.ErrorMapping
-import kafka.common.ReplicaNotAvailableException
+import kafka.common.{TopicExistsException, ErrorMapping, ReplicaNotAvailableException}
+import org.apache.samza.system.kafka.TopicMetadataCache
 
 object KafkaUtil extends Logging {
+  /**
+   * Version number to track the format of the checkpoint log
+   */
+  val CHECKPOINT_LOG_VERSION_NUMBER = 1
   val counter = new AtomicLong(0)
 
   def getClientId(id: String, config: Config): String = getClientId(
@@ -51,6 +59,9 @@ object KafkaUtil extends Logging {
     abs(envelope.getPartitionKey.hashCode()) % numPartitions
   }
 
+  def getCheckpointTopic(jobName: String, jobId: String) =
+    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+
   /**
    * Exactly the same as Kafka's ErrorMapping.maybeThrowException
    * implementation, except suppresses ReplicaNotAvailableException exceptions.
@@ -75,3 +86,102 @@ object KafkaUtil extends Logging {
     code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode
   }
 }
+
+class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+                val connectZk: () => ZkClient) extends Logging {
+  /**
+   * Common code for creating a topic in Kafka
+   *
+   * @param topicName Name of the topic to be created
+   * @param partitionCount  Number of partitions in the topic
+   * @param replicationFactor Number of replicas for the topic
+   * @param topicProperties Any topic related properties
+   */
+  def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) {
+    info("Attempting to create topic %s." format topicName)
+    retryBackoff.run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.createTopic(
+            zkClient,
+            topicName,
+            partitionCount,
+            replicationFactor,
+            topicProperties)
+        } finally {
+          zkClient.close
+        }
+
+        info("Created topic %s." format topicName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case tee: TopicExistsException =>
+            info("Topic %s already exists." format topicName)
+            loop.done
+          case e: Exception =>
+            warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
+            debug("Exception detail:", e)
+        }
+      }
+    )
+  }
+
+  /**
+   * Common code to validate partition count in a topic
+   *
+   * @param topicName Name of the topic to be validated
+   * @param systemName  Kafka system to use
+   * @param metadataStore Topic Metadata store
+   * @param expectedPartitionCount  Expected number of partitions
+   */
+  def validateTopicPartitionCount(topicName: String,
+                                  systemName: String,
+                                  metadataStore: TopicMetadataStore,
+                                  expectedPartitionCount: Int) {
+    info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount))
+    retryBackoff.run(
+      loop => {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(topicName)
+        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+
+        val partitionCount = topicMetadata.partitionsMetadata.length
+        if (partitionCount != expectedPartitionCount) {
+          throw new KafkaUtilException("Validation failed for topic %s because partition count %s did not " +
+            "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount))
+        }
+
+        info("Successfully validated topic %s." format topicName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: KafkaUtilException => throw e
+          case e: Exception =>
+            warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
+            debug("Exception detail:", e)
+        }
+      }
+    )
+  }
+
+  /**
+   * Code to verify that a topic exists
+   *
+   * @param topicName Name of the topic
+   * @return If exists, it returns true. Otherwise, false.
+   */
+  def topicExists(topicName: String): Boolean = {
+    val zkClient = connectZk()
+    try {
+      AdminUtils.topicExists(zkClient, topicName)
+    } finally {
+      zkClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
new file mode 100644
index 0000000..b9c5dd7
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.apache.samza.SamzaException
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaUtilException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaUtilException(var message: String, t: Throwable) extends SamzaException(message, t) {
+  def this(s: String) = this(s, null)
+}
\ No newline at end of file