You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/04 17:49:04 UTC
samza git commit: SAMZA-1427; use systemFactory in checkpoint manager.
Repository: samza
Updated Branches:
refs/heads/master 1cf98443f -> fd7a57708
SAMZA-1427; use systemFactory in checkpoint manager.
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Jagadish Venkatraman <ja...@apache.org>, Prateek Maheshwari <pm...@linkedin.com>
Closes #299 from sborya/LiKafkaClient
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fd7a5770
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fd7a5770
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fd7a5770
Branch: refs/heads/master
Commit: fd7a57708fd7165c2db74810ca14fc37c50f4cb5
Parents: 1cf9844
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 4 10:48:58 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Oct 4 10:48:58 2017 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/samza/util/Util.scala | 27 +-
.../kafka/KafkaCheckpointManager.scala | 249 ++++++++-----------
.../kafka/KafkaCheckpointManagerFactory.scala | 55 ++--
.../org/apache/samza/config/KafkaConfig.scala | 4 +-
.../kafka/TestKafkaCheckpointManager.scala | 94 +++++--
.../system/kafka/TestKafkaSystemAdmin.scala | 2 +-
.../test/integration/StreamTaskTestUtil.scala | 2 +-
7 files changed, 234 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index d639620..46bdb75 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,29 +19,22 @@
package org.apache.samza.util
-import java.net._
import java.io._
import java.lang.management.ManagementFactory
-import java.util.zip.CRC32
-import org.apache.samza.{SamzaException, Partition}
-import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
+import java.net._
import java.util.Random
+import java.util.zip.CRC32
-import org.apache.samza.config.Config
-import org.apache.samza.config.ConfigException
-import org.apache.samza.config.ConfigRewriter
-import org.apache.samza.config.JobConfig
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.SystemConfig
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config._
+import org.apache.samza.serializers._
+import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition}
+import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConverters._
-import java.io.InputStreamReader
-
-
import scala.collection.immutable.Map
-import org.apache.samza.serializers._
+
object Util extends Logging {
val random = new Random
@@ -195,8 +188,8 @@ object Util extends Logging {
}
/**
- * Generates a coordinator stream name based off of the job name and job id
- * for the jobd. The format is of the stream name will be
+ * Generates a coordinator stream name based on the job name and job id
+ * for the job. The format of the stream name will be:
* __samza_coordinator_<JOBNAME>_<JOBID>.
*/
def getCoordinatorStreamName(jobName: String, jobId: String) = {
@@ -231,7 +224,7 @@ object Util extends Logging {
}
/**
- * Get the Coordinator System and system factory from the configuration
+ * Get the coordinator system and system factory from the configuration
* @param config
* @return
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index c8b7a9b..1e22763 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -21,25 +21,19 @@ package org.apache.samza.checkpoint.kafka
import java.nio.ByteBuffer
import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
-import kafka.api._
-import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
import kafka.utils.ZkUtils
-
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.config.JavaSystemConfig
import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.{StreamSpec, SystemAdmin}
-import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
import org.apache.samza.util._
+import org.apache.samza.{Partition, SamzaException}
+import scala.collection.JavaConversions._
import scala.collection.mutable
/**
@@ -57,22 +51,22 @@ class KafkaCheckpointManager(
socketTimeout: Int,
bufferSize: Int,
fetchSize: Int,
+ getSystemConsumer: () => SystemConsumer,
+ getSystemAdmin: () => SystemAdmin,
val metadataStore: TopicMetadataStore,
- connectProducer: () => Producer[Array[Byte], Array[Byte]],
+ getSystemProducer: () => SystemProducer,
val connectZk: () => ZkUtils,
systemStreamPartitionGrouperFactoryString: String,
failOnCheckpointValidation: Boolean,
val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
serde: CheckpointSerde = new CheckpointSerde,
- checkpointTopicProperties: Properties = new Properties,
- systemAdmin: SystemAdmin = null) extends CheckpointManager with Logging {
- import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._
+ checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
var taskNames = Set[TaskName]()
- var producer: Producer[Array[Byte], Array[Byte]] = null
+ @volatile var systemProducer: SystemProducer = null
var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+ val systemAdmin = getSystemAdmin()
- var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
@@ -91,61 +85,34 @@ class KafkaCheckpointManager(
val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
val keyBytes = key.toBytes()
val msgBytes = serde.toBytes(checkpoint)
+ val systemStream = new SystemStream(systemName, checkpointTopic)
+ val envelope = new OutgoingMessageEnvelope(systemStream, keyBytes, msgBytes)
+
retryBackoff.run(
loop => {
- if (producer == null) {
- producer = connectProducer()
+ if (systemProducer == null) {
+ synchronized {
+ if (systemProducer == null) {
+ systemProducer = getSystemProducer()
+ systemProducer.register(taskName.getTaskName)
+ systemProducer.start
+ }
+ }
}
- producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get()
+ systemProducer.send(taskName.getTaskName, envelope)
+ systemProducer.flush(taskName.getTaskName) // make sure it is written
+ info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
loop.done
},
(exception, loop) => {
- warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception))
+ warn("Failed to write checkpoint log partition entry %s: %s. Retrying." format(key, exception))
debug("Exception detail:", exception)
- if (producer != null) {
- producer.close
- }
- producer = null
}
)
}
- private def getConsumer(): SimpleConsumer = {
- val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
- val metadata = metadataMap(checkpointTopic)
- val partitionMetadata = metadata.partitionsMetadata
- .filter(_.partitionId == 0)
- .headOption
- .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
- info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
- new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
- }
-
- private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
- private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
- val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
- .partitionErrorAndOffsets
- .get(topicAndPartition)
- .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic))
- // Fail or retry if there was an an issue with the offset request.
- KafkaUtil.maybeThrowException(offsetResponse.error)
-
- val offset: Long = offsetResponse
- .offsets
- .headOption
- .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
- offset
- }
-
/**
* Read the last checkpoint for specified TaskName
*
@@ -184,100 +151,104 @@ class KafkaCheckpointManager(
def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
val taskName = checkpointKey.getCheckpointTaskName
val checkpoint = serde.fromBytes(Utils.readBytes(payload))
- debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
}
- readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+ readLog(shouldHandleEntry, handleCheckpoint)
checkpoints.toMap /* of the immutable kind */
}
+ private def getSSPMetadata(topic: String, partition: Partition): SystemStreamPartitionMetadata = {
+ val metaDataMap: java.util.Map[String, SystemStreamMetadata] = systemAdmin.getSystemStreamMetadata(Collections.singleton(topic))
+ val checkpointMetadata: SystemStreamMetadata = metaDataMap.get(topic)
+ if (checkpointMetadata == null) {
+ throw new SamzaException("Cannot get metadata for system=%s, topic=%s" format(systemName, topic))
+ }
+
+ val partitionMetaData = checkpointMetadata.getSystemStreamPartitionMetadata().get(partition)
+ if (partitionMetaData == null) {
+ throw new SamzaException("Cannot get partitionMetaData for system=%s, topic=%s" format(systemName, topic))
+ }
+
+ return partitionMetaData
+ }
/**
- * Common code for reading both changelog partition mapping and change log
+ * Reads an entry from the checkpoint log and invokes the provided lambda on it.
*
- * @param entryType What type of entry to look for within the log key's
* @param handleEntry Code to handle an entry in the log once it's found
*/
- private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+ private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
- retryBackoff.run[Unit](
- loop => {
- val consumer = getConsumer()
-
- val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+ val UNKNOWN_OFFSET = "-1"
+ var attempts = 10
+ val POLL_TIMEOUT = 1000L
+
+ val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
+ val systemConsumer = getSystemConsumer()
+ val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
+ // offsets returned are strings
+ val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
+ val oldestOffset = partitionMetadata.getOldestOffset
+ systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
+ systemConsumer.start()
+
+ var msgCount = 0
+ try {
+ val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
+ // convert offsets to long
+ var currentOffset = UNKNOWN_OFFSET.toLong
+ val newestOffsetLong = newestOffset.toLong
+ val sspToPoll = Collections.singleton(ssp)
+ while (currentOffset < newestOffsetLong) {
+
+ val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
try {
- var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
- info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
- val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
- info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
- if (offset < 0) {
- info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
- return
+ systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
+ } catch {
+ case e: Exception => {
+ // these exceptions are most likely intermediate
+ warn("Got %s exception while polling the consumer for checkpoints." format e)
+ if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
+ attempts -= 1
+ emptyEnvelopes
}
+ }
- while (offset < latestOffset) {
- val request = new FetchRequestBuilder()
- .addFetch(checkpointTopic, 0, offset, fetchSize)
- .maxWait(500)
- .minBytes(1)
- .clientId(clientId)
- .build
-
- val fetchResponse = consumer.fetch(request)
- if (fetchResponse.hasError) {
- warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
- val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
- if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
- warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
- return
- }
- KafkaUtil.maybeThrowException(errorCode)
+ val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
+ val messagesNum = if (messages != null) messages.size else 0
+ info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
+ format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
+ if (envelopes.isEmpty || messagesNum <= 0) {
+ info("Got empty/null list of messages")
+ } else {
+ msgCount += messages.size()
+ // check the key
+ for (msg: IncomingMessageEnvelope <- messages) {
+ val key = msg.getKey.asInstanceOf[Array[Byte]]
+ currentOffset = msg.getOffset().toLong
+ if (key == null) {
+ throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
+ format currentOffset)
}
- for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
- offset = response.nextOffset
- startingOffset = Some(offset) // For next time we call
-
- if (!response.message.hasKey) {
- throw new KafkaUtilException("Encountered message without key.")
- }
-
- val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+ val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
- if (!shouldHandleEntry(checkpointKey)) {
- debug("Skipping " + entryType + " entry with key " + checkpointKey)
- } else {
- handleEntry(response.message.payload, checkpointKey)
- }
+ if (!shouldHandleEntry(checkpointKey)) {
+ info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
+ } else {
+ // handleEntry requires ByteBuffer
+ val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
+ handleEntry(checkpointPayload, checkpointKey)
}
}
- } finally {
- consumer.close()
- }
-
- loop.done
- Unit
- },
-
- (exception, loop) => {
- exception match {
- case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: KafkaUtilException => throw e
- case e: Exception =>
- warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
- debug("Exception detail:", e)
}
}
- ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
+ } finally {
+ systemConsumer.stop()
+ }
+ info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
}
override def start {
@@ -290,10 +261,15 @@ class KafkaCheckpointManager(
taskNames += taskName
}
- override def stop = {
- if (producer != null) {
- producer.close
- }
+
+ def stop = {
+ synchronized (
+ if (systemProducer != null) {
+ systemProducer.stop
+ systemProducer = null
+ }
+ )
+
}
override def clearCheckpoints = {
@@ -304,8 +280,3 @@ class KafkaCheckpointManager(
override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
}
-
-object KafkaCheckpointManager {
- val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
- val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 0df581f..402248f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -22,15 +22,14 @@ package org.apache.samza.checkpoint.kafka
import java.util.Properties
import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.{SystemConfig, JavaSystemConfig, Config, KafkaConfig}
+import org.apache.samza.config.{Config, KafkaConfig, SystemConfig}
import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.{SystemFactory, SystemAdmin}
-import org.apache.samza.util.{Util, ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _}
+
object KafkaCheckpointManagerFactory {
val INJECTED_PRODUCER_PROPERTIES = Map(
@@ -47,12 +46,29 @@ object KafkaCheckpointManagerFactory {
val segmentBytes: Int = if (config == null) {
KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
} else {
- config.getCheckpointSegmentBytes()
+ new KafkaConfig(config).getCheckpointSegmentBytes()
}
(new Properties /: Map(
"cleanup.policy" -> "compact",
"segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
}
+
+ /**
+ * Get the checkpoint system and system factory from the configuration
+ * @param config
+ * @return system name and system factory
+ */
+ def getCheckpointSystemStreamAndFactory(config: Config) = {
+
+ val kafkaConfig = new KafkaConfig(config)
+ val systemName = kafkaConfig.getCheckpointSystem.getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+
+ val systemFactoryClassName = new SystemConfig(config)
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ (systemName, systemFactory)
+ }
}
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -63,19 +79,17 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
val jobId = config.getJobId.getOrElse("1")
- val systemName = config
- .getCheckpointSystem
- .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+ val (systemName: String, systemFactory : SystemFactory) = getCheckpointSystemStreamAndFactory(config)
- val producerConfig = config.getKafkaSystemProducerConfig(
+ val kafkaConfig = new KafkaConfig(config)
+ val producerConfig = kafkaConfig.getKafkaSystemProducerConfig(
systemName,
clientId,
INJECTED_PRODUCER_PROPERTIES)
- val connectProducer = () => {
- new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
- }
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val noOpMetricsRegistry = new NoOpMetricsRegistry()
+
+ val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
val connectZk = () => {
@@ -83,25 +97,22 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
}
val socketTimeout = consumerConfig.socketTimeoutMs
- val systemConfig = new SystemConfig(config)
- val systemFactoryClassName = systemConfig.getSystemFactory(systemName).get
- val systemFactory: SystemFactory = Util.getObj(systemFactoryClassName)
- val systemAdmin = systemFactory.getAdmin(systemName, config)
new KafkaCheckpointManager(
clientId,
KafkaUtil.getCheckpointTopic(jobName, jobId, config),
systemName,
- config.getCheckpointReplicationFactor.getOrElse("3").toInt,
+ kafkaConfig.getCheckpointReplicationFactor.get.toInt,
socketTimeout,
consumerConfig.socketReceiveBufferBytes,
consumerConfig.fetchMessageMaxBytes, // must be > buffer size
+ () => systemFactory.getConsumer(systemName, config, noOpMetricsRegistry),
+ () => systemFactory.getAdmin(systemName, config),
new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout),
- connectProducer,
+ () => systemFactory.getProducer(systemName, config, noOpMetricsRegistry),
connectZk,
config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key
config.failOnCheckpointValidation,
- checkpointTopicProperties = getCheckpointTopicProperties(config),
- systemAdmin = systemAdmin)
+ checkpointTopicProperties = getCheckpointTopicProperties(config))
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 79fd4f3..d926dcb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -273,7 +273,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
- val producerProps = new util.HashMap[String, Object]()
+ val producerProps = new util.HashMap[String, String]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
producerProps.putAll(injectedProps.asJava)
@@ -283,7 +283,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
class KafkaProducerConfig(val systemName: String,
val clientId: String = "",
- properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
+ properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
// Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
val RECONNECT_BACKOFF_MS_DEFAULT = 10L
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index a14812e..43b912d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,22 +19,21 @@
package org.apache.samza.checkpoint.kafka
-import kafka.admin.AdminUtils
-import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
-import kafka.message.InvalidMessageException
-import kafka.server.{KafkaConfig, KafkaServer, ConfigType}
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
-import kafka.integration.KafkaServerTestHarness
-
-import org.apache.kafka.common.security.JaasUtils
+import _root_.kafka.admin.AdminUtils
+import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
+import _root_.kafka.integration.KafkaServerTestHarness
+import _root_.kafka.message.InvalidMessageException
+import _root_.kafka.server.{ConfigType, KafkaConfig}
+import _root_.kafka.utils.{CoreUtils, TestUtils, ZkUtils}
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.security.JaasUtils
import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
+import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, TopicMetadataStore}
+import org.apache.samza.system._
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
import org.junit._
@@ -69,23 +68,40 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
+ var systemConsumerFn: ()=>SystemConsumer = ()=>{null}
+ var systemProducerFn: ()=>SystemProducer = ()=>{null}
+ var systemAdminFn: ()=>SystemAdmin = ()=>{null}
+
@Before
override def setUp {
super.setUp
TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
- val config = new java.util.HashMap[String, Object]()
+ val systemName = "kafka"
val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
-
+ val config = new java.util.HashMap[String, String]()
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
config.put("acks", "all")
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava)
- producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ producerConfig = new KafkaProducerConfig(systemName, "i001", config)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+
+ config.put(SystemConfig.SYSTEM_FACTORY format systemName, "org.apache.samza.system.kafka.KafkaSystemFactory")
+ config.put(org.apache.samza.config.KafkaConfig.CHECKPOINT_SYSTEM, systemName);
+ config.put(JobConfig.JOB_NAME, "some-job-name");
+ config.put(JobConfig.JOB_ID, "i001");
+ config.put("systems.%s.producer.%s" format (systemName, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), brokers)
+ config.put("systems.%s.consumer.zookeeper.connect" format systemName, zkConnect)
+ val cfg: SystemConfig = new SystemConfig(new MapConfig(config))
+ val (systemStreamName: String, systemConsumerFactory : SystemFactory) =
+ KafkaCheckpointManagerFactory.getCheckpointSystemStreamAndFactory(cfg)
+ systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())}
+ systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())}
+ systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)}
}
@After
@@ -171,6 +187,45 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
kcm.stop
}
+
+ @Test
+ def testCheckpointReadTwice {
+ val kcm = getKafkaCheckpointManager
+ val taskName = new TaskName(partition.toString)
+ kcm.register(taskName)
+ createCheckpointTopic()
+ kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+
+ // check that log compaction is enabled.
+ val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
+ val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
+ zkClient.close
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+ // read before topic exists should result in a null checkpoint
+ var readCp = kcm.readLastCheckpoint(taskName)
+ assertNull(readCp)
+
+ // create topic the first time around
+ writeCheckpoint(taskName, cp1)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp1, readCp)
+
+ // writing a second message should work, too
+ writeCheckpoint(taskName, cp2)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp2, readCp)
+ kcm.stop
+
+ // get new KCM for the same stream
+ val kcm1 = getKafkaCheckpointManager
+ kcm1.register(taskName)
+ readCp = kcm1.readLastCheckpoint(taskName)
+ assertEquals(cp2, readCp)
+ kcm1.stop
+ }
+
@Test
def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
@@ -184,9 +239,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
// because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
try {
kcm.readLastCheckpoint(taskName)
- fail("Expected a KafkaUtilException.")
+ fail("Expected an Exception.")
} catch {
case e: KafkaUtilException => None
+ case e: Exception => None
}
kcm.stop
}
@@ -229,8 +285,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
socketTimeout = 30000,
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
+ getSystemConsumer = systemConsumerFn,
+ getSystemAdmin = systemAdminFn,
metadataStore = metadataStore,
- connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+ getSystemProducer = systemProducerFn,
connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
@@ -248,8 +306,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
socketTimeout = 30000,
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
+ getSystemConsumer = systemConsumerFn,
+ getSystemAdmin = systemAdminFn,
metadataStore = metadataStore,
- connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+ getSystemProducer = systemProducerFn,
connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 6fb03a1..762e49e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -69,7 +69,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
override def setUp {
super.setUp
- val config = new java.util.HashMap[String, Object]()
+ val config = new java.util.HashMap[String, String]()
brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
http://git-wip-us.apache.org/repos/asf/samza/blob/fd7a5770/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index a007c77..4ba51f3 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -125,7 +125,7 @@ object StreamTaskTestUtil {
jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
"systems.kafka.producer.bootstrap.servers" -> brokers)
- val config = new util.HashMap[String, Object]()
+ val config = new util.HashMap[String, String]()
config.put("bootstrap.servers", brokers)
config.put("request.required.acks", "-1")