You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/10/11 21:27:07 UTC
[2/3] samza git commit: SAMZA-1868: Create new SamzaAmdmin for Kafka
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
deleted file mode 100644
index 6ab4d32..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ /dev/null
@@ -1,608 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import java.util
-import java.util.{Properties, UUID}
-
-import com.google.common.annotations.VisibleForTesting
-import kafka.admin.{AdminClient, AdminUtils}
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-import kafka.utils.ZkUtils
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.kafka.common.TopicPartition
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system._
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
-import org.apache.samza.{Partition, SamzaException}
-
-import scala.collection.JavaConverters._
-
-
-object KafkaSystemAdmin extends Logging {
-
- @VisibleForTesting @volatile var deleteMessagesCalled = false
- val CLEAR_STREAM_RETRIES = 3
-
- /**
- * A helper method that takes oldest, newest, and upcoming offsets for each
- * system stream partition, and creates a single map from stream name to
- * SystemStreamMetadata.
- */
- def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = {
- val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet)
- .groupBy(_.getStream)
- .map {
- case (streamName, systemStreamPartitions) =>
- val streamPartitionMetadata = systemStreamPartitions
- .map(systemStreamPartition => {
- val partitionMetadata = new SystemStreamPartitionMetadata(
- // If the topic/partition is empty then oldest and newest will
- // be stripped of their offsets, so default to null.
- oldestOffsets.getOrElse(systemStreamPartition, null),
- newestOffsets.getOrElse(systemStreamPartition, null),
- upcomingOffsets(systemStreamPartition))
- (systemStreamPartition.getPartition, partitionMetadata)
- })
- .toMap
- val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava)
- (streamName, streamMetadata)
- }
- .toMap
-
- // This is typically printed downstream and it can be spammy, so debug level here.
- debug("Got metadata: %s" format allMetadata)
-
- allMetadata
- }
-}
-
-/**
- * A helper class that is used to construct the changelog stream specific information
- *
- * @param replicationFactor The number of replicas for the changelog stream
- * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
- */
-case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
-
-/**
- * A Kafka-based implementation of SystemAdmin.
- */
-class KafkaSystemAdmin(
- /**
- * The system name to use when creating SystemStreamPartitions to return in
- * the getSystemStreamMetadata responser.
- */
- systemName: String,
-
- // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
- /**
- * List of brokers that are part of the Kafka system that we wish to
- * interact with. The format is host1:port1,host2:port2.
- */
- brokerListString: String,
-
- /**
- * A method that returns a ZkUtils for the Kafka system. This is invoked
- * when the system admin is attempting to create a coordinator stream.
- */
- connectZk: () => ZkUtils,
-
- /**
- * Custom properties to use when the system admin tries to create a new
- * coordinator stream.
- */
- coordinatorStreamProperties: Properties = new Properties,
-
- /**
- * The replication factor to use when the system admin creates a new
- * coordinator stream.
- */
- coordinatorStreamReplicationFactor: Int = 1,
-
- /**
- * The timeout to use for the simple consumer when fetching metadata from
- * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
- */
- timeout: Int = Int.MaxValue,
-
- /**
- * The buffer size to use for the simple consumer when fetching metadata
- * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
- * configuration.
- */
- bufferSize: Int = ConsumerConfig.SocketBufferSize,
-
- /**
- * The client ID to use for the simple consumer when fetching metadata from
- * Kafka. Equivalent to Kafka's client.id configuration.
- */
- clientId: String = UUID.randomUUID.toString,
-
- /**
- * Replication factor for the Changelog topic in kafka
- * Kafka properties to be used during the Changelog topic creation
- */
- topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](),
-
- /**
- * Kafka properties to be used during the intermediate topic creation
- */
- intermediateStreamProperties: Map[String, Properties] = Map(),
-
- /**
- * Whether deleteMessages() API can be used
- */
- deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
-
- import KafkaSystemAdmin._
-
- @volatile var running = false
- @volatile var adminClient: AdminClient = null
-
- override def start() = {
- if (!running) {
- running = true
- adminClient = createAdminClient()
- }
- }
-
- override def stop() = {
- if (running) {
- running = false
- adminClient.close()
- adminClient = null
- }
- }
-
- private def createAdminClient(): AdminClient = {
- val props = new Properties()
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListString)
- AdminClient.create(props)
- }
-
- override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
- getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
- }
-
- def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, SystemStreamMetadata] = {
- debug("Fetching system stream partition count for: %s" format streams)
- var metadataTTL = cacheTTL
- retryBackoff.run(
- loop => {
- val metadata = TopicMetadataCache.getTopicMetadata(
- streams.asScala.toSet,
- systemName,
- getTopicMetadata,
- metadataTTL)
- val result = metadata.map {
- case (topic, topicMetadata) => {
- KafkaUtil.maybeThrowException(topicMetadata.error.exception())
- val partitionsMap = topicMetadata.partitionsMetadata.map {
- pm =>
- new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
- }.toMap[Partition, SystemStreamPartitionMetadata]
- (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava))
- }
- }
- loop.done
- result.asJava
- },
-
- (exception, loop) => {
- warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
- debug("Exception detail:", exception)
- if (metadataTTL == Long.MaxValue) {
- metadataTTL = 5000 // Revert to the default cache expiration
- }
- }
- ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
- }
-
- /**
- * Returns the offset for the message after the specified offset for each
- * SystemStreamPartition that was passed in.
- */
-
- override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
- // This is safe to do with Kafka, even if a topic is key-deduped. If the
- // offset doesn't exist on a compacted topic, Kafka will return the first
- // message AFTER the offset that was specified in the fetch request.
- offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
- }
-
- override def getSystemStreamMetadata(streams: java.util.Set[String]) =
- getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava
-
- /**
- * Given a set of stream names (topics), fetch metadata from Kafka for each
- * stream, and return a map from stream name to SystemStreamMetadata for
- * each stream. This method will return null for oldest and newest offsets
- * if a given SystemStreamPartition is empty. This method will block and
- * retry indefinitely until it gets a successful response from Kafka.
- */
- def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = {
- debug("Fetching system stream metadata for: %s" format streams)
- var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
- retryBackoff.run(
- loop => {
- val metadata = TopicMetadataCache.getTopicMetadata(
- streams.asScala.toSet,
- systemName,
- getTopicMetadata,
- metadataTTL)
-
- debug("Got metadata for streams: %s" format metadata)
-
- val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
- var oldestOffsets = Map[SystemStreamPartition, String]()
- var newestOffsets = Map[SystemStreamPartition, String]()
- var upcomingOffsets = Map[SystemStreamPartition, String]()
-
- // Get oldest, newest, and upcoming offsets for each topic and partition.
- for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
- debug("Fetching offsets for %s:%s: %s" format (broker.host, broker.port, topicsAndPartitions))
-
- val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
- try {
- upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime)
- oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime)
-
- // Kafka's "latest" offset is always last message in stream's offset +
- // 1, so get newest message in stream by subtracting one. this is safe
- // even for key-deduplicated streams, since the last message will
- // never be deduplicated.
- newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString)
- // Keep only oldest/newest offsets where there is a message. Should
- // return null offsets for empty streams.
- upcomingOffsets.foreach {
- case (topicAndPartition, offset) =>
- if (offset.toLong <= 0) {
- debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
- newestOffsets -= topicAndPartition
- debug("Setting oldest offset to 0 to consume from beginning")
- oldestOffsets += (topicAndPartition -> "0")
- }
- }
- } finally {
- consumer.close
- }
- }
-
- val result = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
- loop.done
- result
- },
-
- (exception, loop) => {
- warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
- debug("Exception detail:", exception)
- metadataTTL = 5000 // Revert to the default cache expiration
- }).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
- }
-
- /**
- * Returns the newest offset for the specified SSP.
- * This method is fast and targeted. It minimizes the number of kafka requests.
- * It does not retry indefinitely if there is any failure.
- * It returns null if the topic is empty. To get the offsets for *all*
- * partitions, it would be more efficient to call getSystemStreamMetadata
- */
- override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
- debug("Fetching newest offset for: %s" format ssp)
- var offset: String = null
- var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
- var retries = maxRetries
- new ExponentialSleepStrategy().run(
- loop => {
- val metadata = TopicMetadataCache.getTopicMetadata(
- Set(ssp.getStream),
- systemName,
- getTopicMetadata,
- metadataTTL)
- debug("Got metadata for streams: %s" format metadata)
-
- val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
- val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId)
- val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1
-
- // Get oldest, newest, and upcoming offsets for each topic and partition.
- debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition))
- val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
- try {
- offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2
-
- // Kafka's "latest" offset is always last message in stream's offset +
- // 1, so get newest message in stream by subtracting one. this is safe
- // even for key-deduplicated streams, since the last message will
- // never be deduplicated.
- if (offset.toLong <= 0) {
- debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
- offset = null
- } else {
- offset = (offset.toLong - 1).toString
- }
- } finally {
- consumer.close
- }
-
- debug("Got offset %s for %s." format(offset, ssp))
- loop.done
- },
-
- (exception, loop) => {
- if (retries > 0) {
- warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception))
- metadataTTL = 0L // Force metadata refresh
- retries -= 1
- } else {
- warn("Exception while trying to get offset for %s" format(ssp), exception)
- loop.done
- throw exception
- }
- })
-
- offset
- }
-
- /**
- * Helper method to use topic metadata cache when fetching metadata, so we
- * don't hammer Kafka more than we need to.
- */
- def getTopicMetadata(topics: Set[String]) = {
- new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
- .getTopicInfo(topics)
- }
-
- /**
- * Break topic metadata topic/partitions into per-broker map so that we can
- * execute only one offset request per broker.
- */
- private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = {
- val brokersToTopicPartitions = metadata
- .values
- // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
- .flatMap(topicMetadata => {
- KafkaUtil.maybeThrowException(topicMetadata.error.exception())
- topicMetadata
- .partitionsMetadata
- // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
- .map(partitionMetadata => {
- val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
- (leader, topicAndPartition)
- })
- })
-
- // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
- .groupBy(_._1)
- // Convert to a Map[Broker, Set[TopicAndPartition]]
- .mapValues(_.map(_._2).toSet)
-
- debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions)
-
- brokersToTopicPartitions
- }
-
- /**
- * Use a SimpleConsumer to fetch either the earliest or latest offset from
- * Kafka for each topic/partition in the topicsAndPartitions set. It is
- * assumed that all topics/partitions supplied reside on the broker that the
- * consumer is connected to.
- */
- private def getOffsets(consumer: SimpleConsumer, topicsAndPartitions: Set[TopicAndPartition], earliestOrLatest: Long) = {
- debug("Getting offsets for %s using earliest/latest value of %s." format (topicsAndPartitions, earliestOrLatest))
-
- var offsets = Map[SystemStreamPartition, String]()
- val partitionOffsetInfo = topicsAndPartitions
- .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(earliestOrLatest, 1)))
- .toMap
- val brokerOffsets = consumer
- .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
- .partitionErrorAndOffsets
- .mapValues(partitionErrorAndOffset => {
- KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception())
- partitionErrorAndOffset.offsets.head
- })
-
- for ((topicAndPartition, offset) <- brokerOffsets) {
- offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString
- }
-
- debug("Got offsets for %s using earliest/latest value of %s: %s" format (topicsAndPartitions, earliestOrLatest, offsets))
-
- offsets
- }
-
- /**
- * @inheritdoc
- */
- override def createStream(spec: StreamSpec): Boolean = {
- info("Create topic %s in system %s" format (spec.getPhysicalName, systemName))
- val kSpec = toKafkaSpec(spec)
- var streamCreated = false
-
- new ExponentialSleepStrategy(initialDelayMs = 500).run(
- loop => {
- val zkClient = connectZk()
- try {
- AdminUtils.createTopic(
- zkClient,
- kSpec.getPhysicalName,
- kSpec.getPartitionCount,
- kSpec.getReplicationFactor,
- kSpec.getProperties)
- } finally {
- zkClient.close
- }
-
- streamCreated = true
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: TopicExistsException =>
- streamCreated = false
- loop.done
- case e: Exception =>
- warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e))
- debug("Exception detail:", e)
- }
- })
-
- streamCreated
- }
-
- /**
- * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
- * @param spec a StreamSpec object
- * @return KafkaStreamSpec object
- */
- def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = {
- if (spec.isChangeLogStream) {
- val topicName = spec.getPhysicalName
- val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName))
- new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor,
- topicMeta.kafkaProps)
- } else if (spec.isCoordinatorStream){
- new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor,
- coordinatorStreamProperties)
- } else if (intermediateStreamProperties.contains(spec.getId)) {
- KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
- } else {
- KafkaStreamSpec.fromSpec(spec)
- }
- }
-
- /**
- * @inheritdoc
- *
- * Validates a stream in Kafka. Should not be called before createStream(),
- * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients,
- * is not read-only and will auto-create a new topic.
- */
- override def validateStream(spec: StreamSpec): Unit = {
- val topicName = spec.getPhysicalName
- info("Validating topic %s." format topicName)
-
- val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
- var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
- retryBackoff.run(
- loop => {
- val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
- val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
- val topicMetadata = topicMetadataMap(topicName)
- KafkaUtil.maybeThrowException(topicMetadata.error.exception())
-
- val partitionCount = topicMetadata.partitionsMetadata.length
- if (partitionCount != spec.getPartitionCount) {
- throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount))
- }
-
- info("Successfully validated topic %s." format topicName)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: StreamValidationException => throw e
- case e: Exception =>
- warn("While trying to validate topic %s: %s. Retrying." format (topicName, e))
- debug("Exception detail:", e)
- metadataTTL = 5000L // Revert to the default value
- }
- })
- }
-
- /**
- * @inheritdoc
- *
- * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
- * Otherwise it's a no-op.
- */
- override def clearStream(spec: StreamSpec): Boolean = {
- info("Delete topic %s in system %s" format (spec.getPhysicalName, systemName))
- val kSpec = KafkaStreamSpec.fromSpec(spec)
- var retries = CLEAR_STREAM_RETRIES
- new ExponentialSleepStrategy().run(
- loop => {
- val zkClient = connectZk()
- try {
- AdminUtils.deleteTopic(
- zkClient,
- kSpec.getPhysicalName)
- } finally {
- zkClient.close
- }
-
- loop.done
- },
-
- (exception, loop) => {
- if (retries > 0) {
- warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception))
- retries -= 1
- } else {
- warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception))
- loop.done
- throw exception
- }
- })
-
- val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get
- topicMetadata.partitionsMetadata.isEmpty
- }
-
- /**
- * @inheritdoc
- *
- * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
- * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
- */
- override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
- if (!running) {
- throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
- }
- if (deleteCommittedMessages) {
- val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) =>
- (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
- }.toMap
- adminClient.deleteRecordsBefore(nextOffsets)
- deleteMessagesCalled = true
- }
- }
-
- /**
- * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
- * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
- *
- * Currently it's used in the context of the broadcast streams to detect
- * the mismatch between two streams when consuming the broadcast streams.
- */
- override def offsetComparator(offset1: String, offset2: String): Integer = {
- offset1.toLong compare offset2.toLong
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala
new file mode 100644
index 0000000..6ff2b50
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import java.util
+import java.util.Properties
+
+import kafka.admin.{AdminClient, AdminUtils}
+import kafka.utils.{Logging, ZkUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
+import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.ExponentialSleepStrategy
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+
+/**
+ * A helper class that is used to construct the changelog stream specific information
+ *
+ * @param replicationFactor The number of replicas for the changelog stream
+ * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
+ */
+case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
+
+
+// TODO move to org.apache.kafka.clients.admin.AdminClien from the kafka.admin.AdminClient
+object KafkaSystemAdminUtilsScala extends Logging {
+
+ val CLEAR_STREAM_RETRIES = 3
+ val CREATE_STREAM_RETRIES = 10
+
+ /**
+ * @inheritdoc
+ *
+ * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
+ * Otherwise it's a no-op.
+ */
+ def clearStream(spec: StreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Unit = {
+ info("Deleting topic %s for system %s" format(spec.getPhysicalName, spec.getSystemName))
+ val kSpec = KafkaStreamSpec.fromSpec(spec)
+ var retries = CLEAR_STREAM_RETRIES
+ new ExponentialSleepStrategy().run(
+ loop => {
+ val zkClient = connectZk.get()
+ try {
+ AdminUtils.deleteTopic(
+ zkClient,
+ kSpec.getPhysicalName)
+ } finally {
+ zkClient.close
+ }
+
+ loop.done
+ },
+
+ (exception, loop) => {
+ if (retries > 0) {
+ warn("Exception while trying to delete topic %s. Retrying." format (spec.getPhysicalName), exception)
+ retries -= 1
+ } else {
+ warn("Fail to delete topic %s." format (spec.getPhysicalName), exception)
+ loop.done
+ throw exception
+ }
+ })
+ }
+
+
+ def createStream(kSpec: KafkaStreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Boolean = {
+ info("Creating topic %s for system %s" format(kSpec.getPhysicalName, kSpec.getSystemName))
+ var streamCreated = false
+ var retries = CREATE_STREAM_RETRIES
+
+ new ExponentialSleepStrategy(initialDelayMs = 500).run(
+ loop => {
+ val zkClient = connectZk.get()
+ try {
+ AdminUtils.createTopic(
+ zkClient,
+ kSpec.getPhysicalName,
+ kSpec.getPartitionCount,
+ kSpec.getReplicationFactor,
+ kSpec.getProperties)
+ } finally {
+ zkClient.close
+ }
+
+ streamCreated = true
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: TopicExistsException =>
+ streamCreated = false
+ loop.done
+ case e: Exception =>
+ if (retries > 0) {
+ warn("Failed to create topic %s. Retrying." format (kSpec.getPhysicalName), exception)
+ retries -= 1
+ } else {
+ error("Failed to create topic %s. Bailing out." format (kSpec.getPhysicalName), exception)
+ throw exception
+ }
+ }
+ })
+
+ streamCreated
+ }
+
+ /**
+ * A helper method that takes oldest, newest, and upcoming offsets for each
+ * system stream partition, and creates a single map from stream name to
+ * SystemStreamMetadata.
+ */
+ def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = {
+ val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet)
+ .groupBy(_.getStream)
+ .map {
+ case (streamName, systemStreamPartitions) =>
+ val streamPartitionMetadata = systemStreamPartitions
+ .map(systemStreamPartition => {
+ val partitionMetadata = new SystemStreamPartitionMetadata(
+ // If the topic/partition is empty then oldest and newest will
+ // be stripped of their offsets, so default to null.
+ oldestOffsets.getOrElse(systemStreamPartition, null),
+ newestOffsets.getOrElse(systemStreamPartition, null),
+ upcomingOffsets(systemStreamPartition))
+ (systemStreamPartition.getPartition, partitionMetadata)
+ })
+ .toMap
+ val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava)
+ (streamName, streamMetadata)
+ }
+ .toMap
+
+ // This is typically printed downstream and it can be spammy, so debug level here.
+ debug("Got metadata: %s" format allMetadata)
+
+ allMetadata
+ }
+
+ def getCoordinatorTopicProperties(config: KafkaConfig) = {
+ val segmentBytes = config.getCoordinatorSegmentBytes
+ (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+ }
+
+ def getIntermediateStreamProperties(config: Config): Map[String, Properties] = {
+ val appConfig = new ApplicationConfig(config)
+ if (appConfig.getAppMode == ApplicationMode.BATCH) {
+ val streamConfig = new StreamConfig(config)
+ streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
+ val properties = new Properties()
+ properties.putAll(streamConfig.getStreamProperties(streamId))
+ properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ (streamId, properties)
+ }).toMap
+ } else {
+ Map()
+ }
+ }
+
+ def deleteMessages(adminClient : AdminClient, offsets: util.Map[SystemStreamPartition, String]) = {
+ val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) =>
+ (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
+ }.toMap
+ adminClient.deleteRecordsBefore(nextOffsets);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
deleted file mode 100644
index 10ce274..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ /dev/null
@@ -1,371 +0,0 @@
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.common.TopicAndPartition;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.KafkaConsumerConfig;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import org.apache.samza.util.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-
-public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
-
- private static final long FETCH_THRESHOLD = 50000;
- private static final long FETCH_THRESHOLD_BYTES = -1L;
-
- private final Consumer<K, V> kafkaConsumer;
- private final String systemName;
- private final String clientId;
- private final AtomicBoolean stopped = new AtomicBoolean(false);
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Config config;
- private final boolean fetchThresholdBytesEnabled;
- private final KafkaSystemConsumerMetrics metrics;
-
- // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
- final KafkaConsumerMessageSink messageSink;
-
- // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
- // BlockingEnvelopMap's buffers.
- final private KafkaConsumerProxy proxy;
-
- // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
- final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
- final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
-
- long perPartitionFetchThreshold;
- long perPartitionFetchThresholdBytes;
-
- /**
- * Create a KafkaSystemConsumer for the provided {@code systemName}
- * @param systemName system name for which we create the consumer
- * @param config application config
- * @param metrics metrics for this KafkaSystemConsumer
- * @param clock system clock
- */
- public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
- KafkaSystemConsumerMetrics metrics, Clock clock) {
-
- super(metrics.registry(), clock, metrics.getClass().getName());
-
- this.kafkaConsumer = kafkaConsumer;
- this.clientId = clientId;
- this.systemName = systemName;
- this.config = config;
- this.metrics = metrics;
-
- fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
-
- // create a sink for passing the messages between the proxy and the consumer
- messageSink = new KafkaConsumerMessageSink();
-
- // Create the proxy to do the actual message reading.
- String metricName = String.format("%s", systemName);
- proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
- LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
- }
-
- /**
- * Create internal kafka consumer object, which will be used in the Proxy.
- * @param systemName system name for which we create the consumer
- * @param clientId client id to use int the kafka client
- * @param config config
- * @return kafka consumer object
- */
- public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
-
- // extract kafka client configs
- KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
-
- LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig);
-
- return new KafkaConsumer(consumerConfig);
- }
-
- @Override
- public void start() {
- if (!started.compareAndSet(false, true)) {
- LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
- return;
- }
- if (stopped.get()) {
- LOG.error("{}: Attempting to start a stopped consumer", this);
- return;
- }
- // initialize the subscriptions for all the registered TopicPartitions
- startSubscription();
- // needs to be called after all the registrations are completed
- setFetchThresholds();
-
- startConsumer();
- LOG.info("{}: Consumer started", this);
- }
-
- private void startSubscription() {
- //subscribe to all the registered TopicPartitions
- LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
- try {
- synchronized (kafkaConsumer) {
- // we are using assign (and not subscribe), so we need to specify both topic and partition
- kafkaConsumer.assign(topicPartitionsToSSP.keySet());
- }
- } catch (Exception e) {
- throw new SamzaException("Consumer subscription failed for " + this, e);
- }
- }
-
- /**
- * Set the offsets to start from.
- * Register the TopicPartitions with the proxy.
- * Start the proxy.
- */
- void startConsumer() {
- // set the offset for each TopicPartition
- if (topicPartitionsToOffset.size() <= 0) {
- LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
- }
-
- topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
- long startingOffset = Long.valueOf(startingOffsetString);
-
- try {
- synchronized (kafkaConsumer) {
- kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
- }
- } catch (Exception e) {
- // all recoverable execptions are handled by the client.
- // if we get here there is nothing left to do but bail out.
- String msg =
- String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
- LOG.error(msg, e);
- throw new SamzaException(msg, e);
- }
-
- LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
-
- // add the partition to the proxy
- proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
- });
-
- // start the proxy thread
- if (proxy != null && !proxy.isRunning()) {
- LOG.info("{}: Starting proxy {}", this, proxy);
- proxy.start();
- }
- }
-
- private void setFetchThresholds() {
- // get the thresholds, and set defaults if not defined.
- KafkaConfig kafkaConfig = new KafkaConfig(config);
-
- Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName);
- long fetchThreshold = FETCH_THRESHOLD;
- if (fetchThresholdOption.isDefined()) {
- fetchThreshold = Long.valueOf(fetchThresholdOption.get());
- }
-
- Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
- long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
- if (fetchThresholdBytesOption.isDefined()) {
- fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
- }
-
- int numPartitions = topicPartitionsToSSP.size();
- if (numPartitions != topicPartitionsToOffset.size()) {
- throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
- }
-
-
- if (numPartitions > 0) {
- perPartitionFetchThreshold = fetchThreshold / numPartitions;
- if (fetchThresholdBytesEnabled) {
- // currently this feature cannot be enabled, because we do not have the size of the messages available.
- // messages get double buffered, hence divide by 2
- perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions;
- }
- }
- LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}",
- this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
- }
-
- @Override
- public void stop() {
- if (!stopped.compareAndSet(false, true)) {
- LOG.warn("{}: Attempting to stop stopped consumer.", this);
- return;
- }
-
- LOG.info("{}: Stopping Samza kafkaConsumer ", this);
-
- // stop the proxy (with 1 minute timeout)
- if (proxy != null) {
- LOG.info("{}: Stopping proxy {}", this, proxy);
- proxy.stop(TimeUnit.SECONDS.toMillis(60));
- }
-
- try {
- synchronized (kafkaConsumer) {
- LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
- kafkaConsumer.close();
- }
- } catch (Exception e) {
- LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
- }
- }
-
- /**
- * record the ssp and the offset. Do not submit it to the consumer yet.
- * @param systemStreamPartition ssp to register
- * @param offset offset to register with
- */
- @Override
- public void register(SystemStreamPartition systemStreamPartition, String offset) {
- if (started.get()) {
- String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
- systemStreamPartition);
- throw new SamzaException(msg);
- }
-
- if (!systemStreamPartition.getSystem().equals(systemName)) {
- LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
- return;
- }
- LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
-
- super.register(systemStreamPartition, offset);
-
- TopicPartition tp = toTopicPartition(systemStreamPartition);
-
- topicPartitionsToSSP.put(tp, systemStreamPartition);
-
- String existingOffset = topicPartitionsToOffset.get(tp);
- // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
- if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
- topicPartitionsToOffset.put(tp, offset);
- }
-
- metrics.registerTopicAndPartition(toTopicAndPartition(tp));
- }
-
- /**
- * Compare two String offsets.
- * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
- * @return see {@link Long#compareTo(Long)}
- */
- private static int compareOffsets(String offset1, String offset2) {
- return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
- }
-
- @Override
- public String toString() {
- return String.format("%s:%s", systemName, clientId);
- }
-
- @Override
- public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
- Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
-
- // check if the proxy is running
- if (!proxy.isRunning()) {
- stop();
- String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
- throw new SamzaException(message, proxy.getFailureCause());
- }
-
- return super.poll(systemStreamPartitions, timeout);
- }
-
- /**
- * convert from TopicPartition to TopicAndPartition
- */
- public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
- return new TopicAndPartition(tp.topic(), tp.partition());
- }
-
- /**
- * convert to TopicPartition from SystemStreamPartition
- */
- public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
- return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
- }
-
- /**
- * return system name for this consumer
- * @return system name
- */
- public String getSystemName() {
- return systemName;
- }
-
- public class KafkaConsumerMessageSink {
-
- public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
- setIsAtHead(ssp, isAtHighWatermark);
- }
-
- boolean needsMoreMessages(SystemStreamPartition ssp) {
- LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
- + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
- getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
- perPartitionFetchThreshold);
-
- if (fetchThresholdBytesEnabled) {
- return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
- } else {
- return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
- }
- }
-
- void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
- LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope);
-
- try {
- put(ssp, envelope);
- } catch (InterruptedException e) {
- throw new SamzaException(
- String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this,
- envelope.getOffset(), ssp));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index ba5390b..f314f92 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -21,13 +21,11 @@ package org.apache.samza.system.kafka
import java.util.Properties
-import kafka.utils.ZkUtils
+import com.google.common.annotations.VisibleForTesting
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.config.StorageConfig._
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
@@ -35,32 +33,40 @@ import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, Syst
import org.apache.samza.util._
object KafkaSystemFactory extends Logging {
+ @VisibleForTesting
def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
Map[String, String]("compression.type" -> "none")
} else {
Map[String, String]()
}
+
+ val CLIENTID_PRODUCER_PREFIX = "kafka-producer"
+ val CLIENTID_CONSUMER_PREFIX = "kafka-consumer"
+ val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer"
}
class KafkaSystemFactory extends SystemFactory with Logging {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
- val clientId = KafkaConsumerConfig.getConsumerClientId( config)
val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
- val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config)
+ val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_CONSUMER_PREFIX, config);
+ val kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
+
+ val kafkaConsumer = KafkaSystemConsumer.createKafkaConsumerImpl[Array[Byte], Array[Byte]](systemName, kafkaConsumerConfig)
info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer))
- val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock)
- info("Created samza system consumer %s" format (kafkaSystemConsumer.toString))
+ val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics,
+ new SystemClock)
+ info("Created samza system consumer for system %s, config %s: %s" format(systemName, config, kafkaSystemConsumer))
kafkaSystemConsumer
}
def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
- val clientId = KafkaConsumerConfig.getProducerClientId(config)
val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
+ val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, config);
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
val getProducer = () => {
new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
@@ -70,6 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
// Unlike consumer, no need to use encoders here, since they come for free
// inside the producer configs. Kafka's producer will handle all of this
// for us.
+ info("Creating kafka producer for system %s, producerClientId %s" format(systemName, clientId))
new KafkaSystemProducer(
systemName,
@@ -80,43 +87,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
}
def getAdmin(systemName: String, config: Config): SystemAdmin = {
- val clientId = KafkaConsumerConfig.getAdminClientId(config)
- val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
- val bootstrapServers = producerConfig.bootsrapServers
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
- val timeout = consumerConfig.socketTimeoutMs
- val bufferSize = consumerConfig.socketReceiveBufferBytes
- val zkConnect = Option(consumerConfig.zkConnect)
- .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
- val connectZk = () => {
- ZkUtils(zkConnect, 6000, 6000, false)
- }
- val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
- val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
- val storeToChangelog = config.getKafkaChangelogEnabledStores()
- // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
- val topicMetaInformation = storeToChangelog.map { case (storeName, topicName) => {
- val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt
- val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
- info("Creating topic meta information for topic: %s with replication factor: %s" format(topicName, replicationFactor))
- (topicName, changelogInfo)
- }
- }
+ // extract kafka client configs
+ val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_ADMIN_PREFIX, config);
+ val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId)
- val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean)
- val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
- new KafkaSystemAdmin(
- systemName,
- bootstrapServers,
- connectZk,
- coordinatorStreamProperties,
- coordinatorStreamReplicationFactor,
- timeout,
- bufferSize,
- clientId,
- topicMetaInformation,
- intermediateStreamProperties,
- deleteCommittedMessages)
+ new KafkaSystemAdmin(systemName, config, KafkaSystemConsumer.createKafkaConsumerImpl(systemName, consumerConfig))
}
def getCoordinatorTopicProperties(config: Config) = {
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 2d09301..90dfff3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -37,17 +37,6 @@ object KafkaUtil extends Logging {
val CHECKPOINT_LOG_VERSION_NUMBER = 1
val counter = new AtomicLong(0)
- def getClientId(id: String, config: Config): String = getClientId(
- id,
- config.getName.getOrElse(throw new ConfigException("Missing job name.")),
- config.getJobId)
-
- def getClientId(id: String, jobName: String, jobId: String): String =
- "%s-%s-%s" format
- (id.replaceAll("[^A-Za-z0-9]", "_"),
- jobName.replaceAll("[^A-Za-z0-9]", "_"),
- jobId.replaceAll("[^A-Za-z0-9]", "_"))
-
private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n)
def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = {
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
index de5d093..62f6269 100644
--- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -30,9 +30,11 @@ import org.junit.Test;
public class TestKafkaConsumerConfig {
public final static String SYSTEM_NAME = "testSystem";
+ public final static String JOB_NAME = "jobName";
+ public final static String JOB_ID = "jobId";
public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
- private final static String CLIENT_ID = "clientId";
+ private final static String CLIENT_ID_PREFIX = "consumer-client";
@Test
public void testDefaults() {
@@ -44,15 +46,16 @@ public class TestKafkaConsumerConfig {
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"100"); // should NOT be ignored
- props.put(JobConfig.JOB_NAME(), "jobName");
+ props.put(JobConfig.JOB_NAME(), JOB_NAME);
// if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
Config config = new MapConfig(props);
+ String clientId = KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config);
KafkaConsumerConfig kafkaConsumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId);
Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
@@ -71,21 +74,34 @@ public class TestKafkaConsumerConfig {
Assert.assertEquals(ByteArrayDeserializer.class.getName(),
kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
- Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+ // validate group and client id generation
+ Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-" + JOB_NAME + "-" + "1",
+ kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+ Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1",
+ KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config));
+
+ Assert.assertEquals("jobName-1", KafkaConsumerConfig.createConsumerGroupId(config));
+
+ // validate setting of group and client id
+ Assert.assertEquals(KafkaConsumerConfig.createConsumerGroupId(config),
+ kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
- Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
+ Assert.assertEquals(KafkaConsumerConfig.createConsumerGroupId(config),
kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
- Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1",
- KafkaConsumerConfig.getConsumerClientId(config));
- Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config));
- props.put(JobConfig.JOB_ID(), "jobId");
+ Assert.assertEquals(KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config),
+ kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+ // with non-default job id
+ props.put(JobConfig.JOB_ID(), JOB_ID);
config = new MapConfig(props);
+ Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId",
+ kafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config));
+
+ Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.createConsumerGroupId(config));
- Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId",
- KafkaConsumerConfig.getConsumerClientId(config));
- Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config));
}
// test stuff that should not be overridden
@@ -103,8 +119,9 @@ public class TestKafkaConsumerConfig {
props.put(JobConfig.JOB_NAME(), "jobName");
Config config = new MapConfig(props);
+ String clientId = KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config);
KafkaConsumerConfig kafkaConsumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId);
Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
@@ -121,29 +138,30 @@ public class TestKafkaConsumerConfig {
map.put(JobConfig.JOB_NAME(), "jobName");
map.put(JobConfig.JOB_ID(), "jobId");
- String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+
+ String result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-jobName-jobId", result);
- result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+ result = KafkaConsumerConfig.createClientId("consumer-", new MapConfig(map));
Assert.assertEquals("consumer_-jobName-jobId", result);
- result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.createClientId("super-duper-consumer", new MapConfig(map));
Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
map.put(JobConfig.JOB_NAME(), " very important!job");
- result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-_very_important_job-jobId", result);
map.put(JobConfig.JOB_ID(), "number-#3");
- result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-_very_important_job-number__3", result);
}
@Test(expected = SamzaException.class)
public void testNoBootstrapServers() {
- KafkaConsumerConfig kafkaConsumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME,
- "clientId");
+ Config config = new MapConfig(Collections.emptyMap());
+ String clientId = KafkaConsumerConfig.createClientId("clientId", config);
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId);
Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 7e968bf..27601b0 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,14 +19,22 @@
package org.apache.samza.system.kafka;
+import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Properties;
import kafka.api.TopicMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.samza.Partition;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -36,6 +44,97 @@ import static org.junit.Assert.*;
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@Test
+ public void testGetOffsetsAfter() {
+ SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM(), TOPIC(), new Partition(0));
+ SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM(), TOPIC(), new Partition(1));
+ Map<SystemStreamPartition, String> offsets = new HashMap<>();
+ offsets.put(ssp1, "1");
+ offsets.put(ssp2, "2");
+
+ offsets = systemAdmin().getOffsetsAfter(offsets);
+
+ Assert.assertEquals("2", offsets.get(ssp1));
+ Assert.assertEquals("3", offsets.get(ssp2));
+ }
+
+ @Test
+ public void testToKafkaSpec() {
+ String topicName = "testStream";
+
+ int defaultPartitionCount = 2;
+ int changeLogPartitionFactor = 5;
+ Map<String, String> map = new HashMap<>();
+ Config config = new MapConfig(map);
+ StreamSpec spec = new StreamSpec("id", topicName, SYSTEM(), defaultPartitionCount, config);
+
+ KafkaSystemAdmin kafkaAdmin = systemAdmin();
+ KafkaStreamSpec kafkaSpec = kafkaAdmin.toKafkaSpec(spec);
+
+ Assert.assertEquals("id", kafkaSpec.getId());
+ Assert.assertEquals(topicName, kafkaSpec.getPhysicalName());
+ Assert.assertEquals(SYSTEM(), kafkaSpec.getSystemName());
+ Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount());
+
+ // validate that conversion is using coordination metadata
+ map.put("job.coordinator.segment.bytes", "123");
+ map.put("job.coordinator.cleanup.policy", "superCompact");
+ int coordReplicatonFactor = 4;
+ map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(),
+ String.valueOf(coordReplicatonFactor));
+
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map));
+ spec = StreamSpec.createCoordinatorStreamSpec(topicName, SYSTEM());
+ kafkaSpec = admin.toKafkaSpec(spec);
+ Assert.assertEquals(coordReplicatonFactor, kafkaSpec.getReplicationFactor());
+ Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("segment.bytes"));
+ // cleanup policy is overridden in the KafkaAdmin
+ Assert.assertEquals("compact", kafkaSpec.getProperties().getProperty("cleanup.policy"));
+
+ // validate that conversion is using changeLog metadata
+ map = new HashMap<>();
+ map.put(JobConfig.JOB_DEFAULT_SYSTEM(), SYSTEM());
+
+ map.put(String.format("stores.%s.changelog", "fakeStore"), topicName);
+ int changeLogReplicationFactor = 3;
+ map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"),
+ String.valueOf(changeLogReplicationFactor));
+ admin = Mockito.spy(createSystemAdmin(SYSTEM(), map));
+ spec = StreamSpec.createChangeLogStreamSpec(topicName, SYSTEM(), changeLogPartitionFactor);
+ kafkaSpec = admin.toKafkaSpec(spec);
+ Assert.assertEquals(changeLogReplicationFactor, kafkaSpec.getReplicationFactor());
+
+ // same, but with missing topic info
+ try {
+ admin = Mockito.spy(createSystemAdmin(SYSTEM(), map));
+ spec = StreamSpec.createChangeLogStreamSpec("anotherTopic", SYSTEM(), changeLogPartitionFactor);
+ kafkaSpec = admin.toKafkaSpec(spec);
+ Assert.fail("toKafkaSpec should've failed for missing topic");
+ } catch (StreamValidationException e) {
+ // expected
+ }
+
+ // validate that conversion is using intermediate streams properties
+ String interStreamId = "isId";
+
+ Map<String, String> interStreamMap = new HashMap<>();
+ interStreamMap.put("app.mode", ApplicationConfig.ApplicationMode.BATCH.toString());
+ interStreamMap.put(String.format("streams.%s.samza.intermediate", interStreamId), "true");
+ interStreamMap.put(String.format("streams.%s.samza.system", interStreamId), "testSystem");
+ interStreamMap.put(String.format("streams.%s.p1", interStreamId), "v1");
+ interStreamMap.put(String.format("streams.%s.retention.ms", interStreamId), "123");
+ // legacy format
+ interStreamMap.put(String.format("systems.%s.streams.%s.p2", "testSystem", interStreamId), "v2");
+
+ admin = Mockito.spy(createSystemAdmin(SYSTEM(), interStreamMap));
+ spec = new StreamSpec(interStreamId, topicName, SYSTEM(), defaultPartitionCount, config);
+ kafkaSpec = admin.toKafkaSpec(spec);
+ Assert.assertEquals("v1", kafkaSpec.getProperties().getProperty("p1"));
+ Assert.assertEquals("v2", kafkaSpec.getProperties().getProperty("p2"));
+ Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("retention.ms"));
+ Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount());
+ }
+
+ @Test
public void testCreateCoordinatorStream() {
SystemAdmin admin = Mockito.spy(systemAdmin());
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
@@ -49,10 +148,14 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
final String STREAM = "test.coordinator_test.Stream";
- Properties coordProps = new Properties();
- Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+ Map<String, String> map = new HashMap<>();
+ map.put("job.coordinator.segment.bytes", "123");
+ map.put("job.coordinator.cleanup.policy", "compact");
+ int coordReplicatonFactor = 2;
+ map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(),
+ String.valueOf(coordReplicatonFactor));
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map));
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
Mockito.doAnswer(invocationOnMock -> {
@@ -62,6 +165,10 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
assertEquals(SYSTEM(), internalSpec.getSystemName());
assertEquals(STREAM, internalSpec.getPhysicalName());
assertEquals(1, internalSpec.getPartitionCount());
+ Assert.assertEquals(coordReplicatonFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+ Assert.assertEquals("123", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
+ // cleanup policy is overridden in the KafkaAdmin
+ Assert.assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
@@ -71,62 +178,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
@Test
- public void testCreateChangelogStream() {
- final String STREAM = "testChangeLogStream";
- final int PARTITIONS = 12;
- final int REP_FACTOR = 1;
-
- Properties coordProps = new Properties();
- Properties changeLogProps = new Properties();
- changeLogProps.setProperty("cleanup.policy", "compact");
- changeLogProps.setProperty("segment.bytes", "139");
- Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
-
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
- StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
-
- Mockito.doAnswer(invocationOnMock -> {
- StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
- assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
- assertTrue(internalSpec.isChangeLogStream());
- assertEquals(SYSTEM(), internalSpec.getSystemName());
- assertEquals(STREAM, internalSpec.getPhysicalName());
- assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
- assertEquals(PARTITIONS, internalSpec.getPartitionCount());
- assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
-
- return internalSpec;
- }).when(admin).toKafkaSpec(Mockito.any());
-
- admin.createStream(spec);
- admin.validateStream(spec);
+ public void testCreateChangelogStreamHelp() {
+ testCreateChangelogStreamHelp("testChangeLogStream");
}
@Test
public void testCreateChangelogStreamWithSpecialCharsInTopicName() {
- final String STREAM = "test.Change_Log.Stream";
+ // cannot contain period
+ testCreateChangelogStreamHelp("test-Change_Log-Stream");
+ }
+
+ public void testCreateChangelogStreamHelp(final String topic) {
final int PARTITIONS = 12;
- final int REP_FACTOR = 1;
+ final int REP_FACTOR = 2;
- Properties coordProps = new Properties();
- Properties changeLogProps = new Properties();
- changeLogProps.setProperty("cleanup.policy", "compact");
- changeLogProps.setProperty("segment.bytes", "139");
- Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
+ Map<String, String> map = new HashMap<>();
+ map.put(JobConfig.JOB_DEFAULT_SYSTEM(), SYSTEM());
+ map.put(String.format("stores.%s.changelog", "fakeStore"), topic);
+ map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(REP_FACTOR));
+ map.put(String.format("stores.%s.changelog.kafka.segment.bytes", "fakeStore"), "139");
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map));
+ StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM(), PARTITIONS);
- KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
- StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isChangeLogStream());
assertEquals(SYSTEM(), internalSpec.getSystemName());
- assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(topic, internalSpec.getPhysicalName());
assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
assertEquals(PARTITIONS, internalSpec.getPartitionCount());
- assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
+ assertEquals("139", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
+ assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
@@ -176,7 +259,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
systemAdmin().validateStream(spec2);
}
- @Test
+ //@Test //TODO - currently the connection to ZK fails, but since it checks for empty, the tests succeeds. SAMZA-1887
public void testClearStream() {
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
@@ -184,8 +267,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
systemAdmin().createStream(spec));
assertTrue(systemAdmin().clearStream(spec));
- scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
- scala.collection.immutable.Map<String, TopicMetadata> metadata = systemAdmin().getTopicMetadata(topic);
- assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
+ ImmutableSet<String> topics = ImmutableSet.of(spec.getPhysicalName());
+ Map<String, List<PartitionInfo>> metadata = systemAdmin().getTopicMetadata(topics);
+ assertTrue(metadata.get(spec.getPhysicalName()).isEmpty());
}
}