You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/04 17:49:04 UTC

samza git commit: SAMZA-1427; use systemFactory in checkpoint manager.

Repository: samza
Updated Branches:
  refs/heads/master 1cf98443f -> fd7a57708


SAMZA-1427; use systemFactory in checkpoint manager.

Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Jagadish Venkatraman <ja...@apache.org>, Prateek Maheshwari <pm...@linkedin.com>

Closes #299 from sborya/LiKafkaClient


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fd7a5770
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fd7a5770
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fd7a5770

Branch: refs/heads/master
Commit: fd7a57708fd7165c2db74810ca14fc37c50f4cb5
Parents: 1cf9844
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 4 10:48:58 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Oct 4 10:48:58 2017 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/util/Util.scala |  27 +-
 .../kafka/KafkaCheckpointManager.scala          | 249 ++++++++-----------
 .../kafka/KafkaCheckpointManagerFactory.scala   |  55 ++--
 .../org/apache/samza/config/KafkaConfig.scala   |   4 +-
 .../kafka/TestKafkaCheckpointManager.scala      |  94 +++++--
 .../system/kafka/TestKafkaSystemAdmin.scala     |   2 +-
 .../test/integration/StreamTaskTestUtil.scala   |   2 +-
 7 files changed, 234 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index d639620..46bdb75 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,29 +19,22 @@
 
 package org.apache.samza.util
 
-import java.net._
 import java.io._
 import java.lang.management.ManagementFactory
-import java.util.zip.CRC32
-import org.apache.samza.{SamzaException, Partition}
-import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
+import java.net._
 import java.util.Random
+import java.util.zip.CRC32
 
-import org.apache.samza.config.Config
-import org.apache.samza.config.ConfigException
-import org.apache.samza.config.ConfigRewriter
-import org.apache.samza.config.JobConfig
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.SystemConfig
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config._
+import org.apache.samza.serializers._
+import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition}
+import org.apache.samza.{Partition, SamzaException}
 
 import scala.collection.JavaConverters._
-import java.io.InputStreamReader
-
-
 import scala.collection.immutable.Map
-import org.apache.samza.serializers._
+
 
 object Util extends Logging {
   val random = new Random
@@ -195,8 +188,8 @@ object Util extends Logging {
   }
 
   /**
-   * Generates a coordinator stream name based off of the job name and job id
-   * for the jobd. The format is of the stream name will be
+   * Generates a coordinator stream name based on the job name and job id
+   * for the job. The format of the stream name will be:
    * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
    */
   def getCoordinatorStreamName(jobName: String, jobId: String) = {
@@ -231,7 +224,7 @@ object Util extends Logging {
   }
 
   /**
-   * Get the Coordinator System and system factory from the configuration
+   * Get the coordinator system and system factory from the configuration
    * @param config
    * @return
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index c8b7a9b..1e22763 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -21,25 +21,19 @@ package org.apache.samza.checkpoint.kafka
 
 import java.nio.ByteBuffer
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 
-import kafka.api._
-import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
 import kafka.utils.ZkUtils
-
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.config.JavaSystemConfig
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.{StreamSpec, SystemAdmin}
-import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
 import org.apache.samza.util._
+import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
@@ -57,22 +51,22 @@ class KafkaCheckpointManager(
                               socketTimeout: Int,
                               bufferSize: Int,
                               fetchSize: Int,
+                              getSystemConsumer: () => SystemConsumer,
+                              getSystemAdmin: () => SystemAdmin,
                               val metadataStore: TopicMetadataStore,
-                              connectProducer: () => Producer[Array[Byte], Array[Byte]],
+                              getSystemProducer: () => SystemProducer,
                               val connectZk: () => ZkUtils,
                               systemStreamPartitionGrouperFactoryString: String,
                               failOnCheckpointValidation: Boolean,
                               val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                               serde: CheckpointSerde = new CheckpointSerde,
-                              checkpointTopicProperties: Properties = new Properties,
-                              systemAdmin: SystemAdmin = null) extends CheckpointManager with Logging {
-  import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._
+                              checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
 
   var taskNames = Set[TaskName]()
-  var producer: Producer[Array[Byte], Array[Byte]] = null
+  @volatile var  systemProducer: SystemProducer = null
   var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+  val systemAdmin = getSystemAdmin()
 
-  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
   val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
 
 
@@ -91,61 +85,34 @@ class KafkaCheckpointManager(
     val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
     val keyBytes = key.toBytes()
     val msgBytes = serde.toBytes(checkpoint)
+    val systemStream = new SystemStream(systemName, checkpointTopic)
+    val envelope = new OutgoingMessageEnvelope(systemStream, keyBytes, msgBytes)
+
     retryBackoff.run(
       loop => {
-        if (producer == null) {
-          producer = connectProducer()
+        if (systemProducer == null) {
+          synchronized {
+            if (systemProducer == null) {
+              systemProducer = getSystemProducer()
+              systemProducer.register(taskName.getTaskName)
+              systemProducer.start
+            }
+          }
         }
 
-        producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get()
+        systemProducer.send(taskName.getTaskName, envelope)
+        systemProducer.flush(taskName.getTaskName) // make sure it is written
+        info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
         loop.done
       },
 
       (exception, loop) => {
-        warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception))
+        warn("Failed to write checkpoint log partition entry %s: %s. Retrying." format(key, exception))
         debug("Exception detail:", exception)
-        if (producer != null) {
-          producer.close
-        }
-        producer = null
       }
     )
   }
 
-  private def getConsumer(): SimpleConsumer = {
-    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-    val metadata = metadataMap(checkpointTopic)
-    val partitionMetadata = metadata.partitionsMetadata
-      .filter(_.partitionId == 0)
-      .headOption
-      .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
-    val leader = partitionMetadata
-      .leader
-      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
-    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
-    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
-  }
-
-  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
-  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
-    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
-      .partitionErrorAndOffsets
-      .get(topicAndPartition)
-      .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic))
-    // Fail or retry if there was an an issue with the offset request.
-    KafkaUtil.maybeThrowException(offsetResponse.error)
-
-    val offset: Long = offsetResponse
-      .offsets
-      .headOption
-      .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
-    offset
-  }
-
   /**
    * Read the last checkpoint for specified TaskName
    *
@@ -184,100 +151,104 @@ class KafkaCheckpointManager(
     def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
       val taskName = checkpointKey.getCheckpointTaskName
       val checkpoint = serde.fromBytes(Utils.readBytes(payload))
-      debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
       checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
     }
 
-    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+    readLog(shouldHandleEntry, handleCheckpoint)
     checkpoints.toMap /* of the immutable kind */
   }
 
+  private def getSSPMetadata(topic: String, partition: Partition): SystemStreamPartitionMetadata = {
+    val metaDataMap: java.util.Map[String, SystemStreamMetadata] = systemAdmin.getSystemStreamMetadata(Collections.singleton(topic))
+    val checkpointMetadata: SystemStreamMetadata = metaDataMap.get(topic)
+    if (checkpointMetadata == null) {
+      throw new SamzaException("Cannot get metadata for system=%s, topic=%s" format(systemName, topic))
+    }
+
+    val partitionMetaData = checkpointMetadata.getSystemStreamPartitionMetadata().get(partition)
+    if (partitionMetaData == null) {
+      throw new SamzaException("Cannot get partitionMetaData for system=%s, topic=%s" format(systemName, topic))
+    }
+
+    return partitionMetaData
+  }
 
   /**
-   * Common code for reading both changelog partition mapping and change log
+   * Reads an entry from the checkpoint log and invokes the provided lambda on it.
    *
-   * @param entryType What type of entry to look for within the log key's
    * @param handleEntry Code to handle an entry in the log once it's found
    */
-  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+  private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
                       handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
-    retryBackoff.run[Unit](
-      loop => {
-        val consumer = getConsumer()
-
-        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
 
+    val UNKNOWN_OFFSET = "-1"
+    var attempts = 10
+    val POLL_TIMEOUT = 1000L
+
+    val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
+    val systemConsumer = getSystemConsumer()
+    val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
+    // offsets returned are strings
+    val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
+    val oldestOffset = partitionMetadata.getOldestOffset
+    systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
+    systemConsumer.start()
+
+    var msgCount = 0
+    try {
+      val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
+      // convert offsets to long
+      var currentOffset = UNKNOWN_OFFSET.toLong
+      val newestOffsetLong = newestOffset.toLong
+      val sspToPoll = Collections.singleton(ssp)
+      while (currentOffset < newestOffsetLong) {
+
+        val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
         try {
-          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
-          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
-          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
-          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
-          if (offset < 0) {
-            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
-            return
+          systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
+        } catch {
+          case e: Exception => {
+            // these exceptions are most likely intermediate
+            warn("Got %s exception while polling the consumer for checkpoints." format e)
+            if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
+            attempts -= 1
+            emptyEnvelopes
           }
+        }
 
-          while (offset < latestOffset) {
-            val request = new FetchRequestBuilder()
-              .addFetch(checkpointTopic, 0, offset, fetchSize)
-              .maxWait(500)
-              .minBytes(1)
-              .clientId(clientId)
-              .build
-
-            val fetchResponse = consumer.fetch(request)
-            if (fetchResponse.hasError) {
-              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
-              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
-              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
-                return
-              }
-              KafkaUtil.maybeThrowException(errorCode)
+        val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
+        val messagesNum = if (messages != null) messages.size else 0
+        info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
+                     format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
+        if (envelopes.isEmpty || messagesNum <= 0) {
+          info("Got empty/null list of messages")
+        } else {
+          msgCount += messages.size()
+          // check the key
+          for (msg: IncomingMessageEnvelope <- messages) {
+            val key = msg.getKey.asInstanceOf[Array[Byte]]
+            currentOffset = msg.getOffset().toLong
+            if (key == null) {
+              throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
+                                                   format currentOffset)
             }
 
-            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
-              offset = response.nextOffset
-              startingOffset = Some(offset) // For next time we call
-
-              if (!response.message.hasKey) {
-                throw new KafkaUtilException("Encountered message without key.")
-              }
-
-              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+            val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
 
-              if (!shouldHandleEntry(checkpointKey)) {
-                debug("Skipping " + entryType + " entry with key " + checkpointKey)
-              } else {
-                handleEntry(response.message.payload, checkpointKey)
-              }
+            if (!shouldHandleEntry(checkpointKey)) {
+              info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
+            } else {
+              // handleEntry requires ByteBuffer
+              val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
+              handleEntry(checkpointPayload, checkpointKey)
             }
           }
-        } finally {
-          consumer.close()
-        }
-
-        loop.done
-        Unit
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: KafkaUtilException => throw e
-          case e: Exception =>
-            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
-            debug("Exception detail:", e)
         }
       }
-    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
+    } finally {
+      systemConsumer.stop()
+    }
+    info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
   }
 
   override def start {
@@ -290,10 +261,15 @@ class KafkaCheckpointManager(
     taskNames += taskName
   }
 
-  override def stop = {
-    if (producer != null) {
-      producer.close
-    }
+
+  def stop = {
+    synchronized (
+      if (systemProducer != null) {
+        systemProducer.stop
+        systemProducer = null
+      }
+    )
+
   }
 
   override def clearCheckpoints = {
@@ -304,8 +280,3 @@ class KafkaCheckpointManager(
 
   override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
 }
-
-object KafkaCheckpointManager {
-  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
-  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 0df581f..402248f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -22,15 +22,14 @@ package org.apache.samza.checkpoint.kafka
 import java.util.Properties
 
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.{SystemConfig, JavaSystemConfig, Config, KafkaConfig}
+import org.apache.samza.config.{Config, KafkaConfig, SystemConfig}
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.{SystemFactory, SystemAdmin}
-import org.apache.samza.util.{Util, ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _}
+
 
 object KafkaCheckpointManagerFactory {
   val INJECTED_PRODUCER_PROPERTIES = Map(
@@ -47,12 +46,29 @@ object KafkaCheckpointManagerFactory {
     val segmentBytes: Int = if (config == null) {
       KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
     } else {
-      config.getCheckpointSegmentBytes()
+      new KafkaConfig(config).getCheckpointSegmentBytes()
     }
     (new Properties /: Map(
       "cleanup.policy" -> "compact",
       "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
   }
+
+  /**
+   * Get the checkpoint system and system factory from the configuration
+   * @param config
+   * @return system name and system factory
+   */
+  def getCheckpointSystemStreamAndFactory(config: Config) = {
+
+    val kafkaConfig = new KafkaConfig(config)
+    val systemName = kafkaConfig.getCheckpointSystem.getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+
+    val systemFactoryClassName = new SystemConfig(config)
+            .getSystemFactory(systemName)
+            .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    (systemName, systemFactory)
+  }
 }
 
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -63,19 +79,17 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
     val jobId = config.getJobId.getOrElse("1")
 
-    val systemName = config
-      .getCheckpointSystem
-      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+    val (systemName: String, systemFactory : SystemFactory) =  getCheckpointSystemStreamAndFactory(config)
 
-    val producerConfig = config.getKafkaSystemProducerConfig(
+    val kafkaConfig = new KafkaConfig(config)
+    val producerConfig = kafkaConfig.getKafkaSystemProducerConfig(
       systemName,
       clientId,
       INJECTED_PRODUCER_PROPERTIES)
-    val connectProducer = () => {
-      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
-    }
 
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val noOpMetricsRegistry = new NoOpMetricsRegistry()
+
+    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
     val connectZk = () => {
@@ -83,25 +97,22 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     }
     val socketTimeout = consumerConfig.socketTimeoutMs
 
-    val systemConfig = new SystemConfig(config)
-    val systemFactoryClassName = systemConfig.getSystemFactory(systemName).get
-    val systemFactory: SystemFactory = Util.getObj(systemFactoryClassName)
-    val systemAdmin = systemFactory.getAdmin(systemName, config)
 
     new KafkaCheckpointManager(
       clientId,
       KafkaUtil.getCheckpointTopic(jobName, jobId, config),
       systemName,
-      config.getCheckpointReplicationFactor.getOrElse("3").toInt,
+      kafkaConfig.getCheckpointReplicationFactor.get.toInt,
       socketTimeout,
       consumerConfig.socketReceiveBufferBytes,
       consumerConfig.fetchMessageMaxBytes,            // must be > buffer size
+      () => systemFactory.getConsumer(systemName, config, noOpMetricsRegistry),
+      () => systemFactory.getAdmin(systemName, config),
       new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout),
-      connectProducer,
+      () => systemFactory.getProducer(systemName, config, noOpMetricsRegistry),
       connectZk,
       config.getSystemStreamPartitionGrouperFactory,      // To find out the SSPGrouperFactory class so it can be included/verified in the key
       config.failOnCheckpointValidation,
-      checkpointTopicProperties = getCheckpointTopicProperties(config),
-      systemAdmin = systemAdmin)
+      checkpointTopicProperties = getCheckpointTopicProperties(config))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 79fd4f3..d926dcb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -273,7 +273,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
                                     injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
-    val producerProps = new util.HashMap[String, Object]()
+    val producerProps = new util.HashMap[String, String]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
     producerProps.putAll(injectedProps.asJava)
@@ -283,7 +283,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
 class KafkaProducerConfig(val systemName: String,
                           val clientId: String = "",
-                          properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
+                          properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
 
   // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
   val RECONNECT_BACKOFF_MS_DEFAULT = 10L

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index a14812e..43b912d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,22 +19,21 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import kafka.admin.AdminUtils
-import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
-import kafka.message.InvalidMessageException
-import kafka.server.{KafkaConfig, KafkaServer, ConfigType}
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
-import kafka.integration.KafkaServerTestHarness
-
-import org.apache.kafka.common.security.JaasUtils
+import _root_.kafka.admin.AdminUtils
+import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
+import _root_.kafka.integration.KafkaServerTestHarness
+import _root_.kafka.message.InvalidMessageException
+import _root_.kafka.server.{ConfigType, KafkaConfig}
+import _root_.kafka.utils.{CoreUtils, TestUtils, ZkUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
+import org.apache.samza.config._
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, TopicMetadataStore}
+import org.apache.samza.system._
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore}
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
@@ -69,23 +68,40 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
   val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
 
+  var systemConsumerFn: ()=>SystemConsumer = ()=>{null}
+  var systemProducerFn: ()=>SystemProducer = ()=>{null}
+  var systemAdminFn: ()=>SystemAdmin = ()=>{null}
+
   @Before
   override def setUp {
     super.setUp
 
     TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
 
-    val config = new java.util.HashMap[String, Object]()
+    val systemName = "kafka"
     val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
-
+    val config = new java.util.HashMap[String, String]()
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
     config.put("acks", "all")
     config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
     config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava)
-    producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+    producerConfig = new KafkaProducerConfig(systemName, "i001", config)
 
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+
+    config.put(SystemConfig.SYSTEM_FACTORY format systemName, "org.apache.samza.system.kafka.KafkaSystemFactory")
+    config.put(org.apache.samza.config.KafkaConfig.CHECKPOINT_SYSTEM, systemName);
+    config.put(JobConfig.JOB_NAME, "some-job-name");
+    config.put(JobConfig.JOB_ID, "i001");
+    config.put("systems.%s.producer.%s" format (systemName, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), brokers)
+    config.put("systems.%s.consumer.zookeeper.connect" format systemName, zkConnect)
+    val cfg: SystemConfig = new SystemConfig(new MapConfig(config))
+    val (systemStreamName: String, systemConsumerFactory : SystemFactory) =
+      KafkaCheckpointManagerFactory.getCheckpointSystemStreamAndFactory(cfg)
+    systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())}
+    systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())}
+    systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)}
   }
 
   @After
@@ -171,6 +187,45 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     kcm.stop
   }
 
+
+  @Test
+  def testCheckpointReadTwice {
+    val kcm = getKafkaCheckpointManager
+    val taskName = new TaskName(partition.toString)
+    kcm.register(taskName)
+    createCheckpointTopic()
+    kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+
+    // check that log compaction is enabled.
+    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
+    val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
+    zkClient.close
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+    // read before topic exists should result in a null checkpoint
+    var readCp = kcm.readLastCheckpoint(taskName)
+    assertNull(readCp)
+
+    // create topic the first time around
+    writeCheckpoint(taskName, cp1)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp1, readCp)
+
+    // writing a second message should work, too
+    writeCheckpoint(taskName, cp2)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp2, readCp)
+    kcm.stop
+
+    // get new KCM for the same stream
+    val kcm1 = getKafkaCheckpointManager
+    kcm1.register(taskName)
+    readCp = kcm1.readLastCheckpoint(taskName)
+    assertEquals(cp2, readCp)
+    kcm1.stop
+  }
+
   @Test
   def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
     val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
@@ -184,9 +239,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
       try {
         kcm.readLastCheckpoint(taskName)
-        fail("Expected a KafkaUtilException.")
+        fail("Expected an Exception.")
       } catch {
         case e: KafkaUtilException => None
+        case e: Exception => None
       }
       kcm.stop
     }
@@ -229,8 +285,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     socketTimeout = 30000,
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
+    getSystemConsumer = systemConsumerFn,
+    getSystemAdmin = systemAdminFn,
     metadataStore = metadataStore,
-    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+    getSystemProducer = systemProducerFn,
     connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
@@ -248,8 +306,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     socketTimeout = 30000,
     bufferSize = 64 * 1024,
     fetchSize = 300 * 1024,
+    getSystemConsumer = systemConsumerFn,
+    getSystemAdmin = systemAdminFn,
     metadataStore = metadataStore,
-    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+    getSystemProducer = systemProducerFn,
     connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 6fb03a1..762e49e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -69,7 +69,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   override def setUp {
     super.setUp
 
-    val config = new java.util.HashMap[String, Object]()
+    val config = new java.util.HashMap[String, String]()
 
     brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index a007c77..4ba51f3 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -125,7 +125,7 @@ object StreamTaskTestUtil {
     jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
       "systems.kafka.producer.bootstrap.servers" -> brokers)
 
-    val config = new util.HashMap[String, Object]()
+    val config = new util.HashMap[String, String]()
 
     config.put("bootstrap.servers", brokers)
     config.put("request.required.acks", "-1")