You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/05 22:36:41 UTC
samza git commit: SAMZA-1442: use systemAdmin.validateStream in
KafkaCheckpointManager
Repository: samza
Updated Branches:
refs/heads/master 7f1ec6492 -> b989e51ae
SAMZA-1442: use systemAdmin.validateStream in KafkaCheckpointManager
Author: Boris S <bo...@apache.org>
Reviewers: Jacob Maes <ja...@gmail.com>
Closes #314 from sborya/CheckpointTopicValidation
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b989e51a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b989e51a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b989e51a
Branch: refs/heads/master
Commit: b989e51ae866fee9cdba76a457992afc70695fdb
Parents: 7f1ec64
Author: Boris S <bo...@apache.org>
Authored: Thu Oct 5 15:36:37 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Thu Oct 5 15:36:37 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/StreamSpec.java | 4 +
.../kafka/KafkaCheckpointManager.scala | 23 ++++-
.../scala/org/apache/samza/util/KafkaUtil.scala | 88 --------------------
.../kafka/TestKafkaCheckpointManager.scala | 25 ++++--
4 files changed, 45 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 6ea1a22..523ff68 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -81,6 +81,10 @@ public class StreamSpec {
*/
private final Map<String, String> config;
+ @Override
+ public String toString() {
+ return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+ }
/**
* @param id The application-unique logical identifier for the stream. It is used to distinguish between
* streams in a Samza application so it must be unique in the context of one deployable unit.
http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1e22763..4eb6666 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -29,6 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
import org.apache.samza.util._
import org.apache.samza.{Partition, SamzaException}
@@ -252,8 +253,26 @@ class KafkaCheckpointManager(
}
override def start {
- kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties)
- kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation)
+ val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
+ val spec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
+ checkpointTopic, systemName, 1,
+ replicationFactor, checkpointTopicProperties)
+
+ info("About to create checkpoint stream: " + spec)
+ systemAdmin.createStream(spec)
+ info("Created checkpoint stream: " + spec)
+ try {
+ systemAdmin.validateStream(spec) // SPECIAL VALIDATION FOR CHECKPOINT. DO NOT FAIL IF failOnCheckpointValidation IS FALSE
+ info("Validated spec: " + spec)
+ } catch {
+ case e : StreamValidationException =>
+ if (failOnCheckpointValidation) {
+ throw e
+ } else {
+ warn("Checkpoint stream validation partially failed. Ignoring it because failOnCheckpointValidation=" + failOnCheckpointValidation)
+ }
+ case e1 : Exception => throw e1
+ }
}
override def register(taskName: TaskName) {
http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 39edba7..1410cbb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -91,94 +91,6 @@ object KafkaUtil extends Logging {
class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
val connectZk: () => ZkUtils) extends Logging {
- /**
- * Common code for creating a topic in Kafka
- *
- * @param topicName Name of the topic to be created
- * @param partitionCount Number of partitions in the topic
- * @param replicationFactor Number of replicas for the topic
- * @param topicProperties Any topic related properties
- */
- def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) {
- info("Attempting to create topic %s." format topicName)
- retryBackoff.run(
- loop => {
- val zkClient = connectZk()
- try {
- AdminUtils.createTopic(
- zkClient,
- topicName,
- partitionCount,
- replicationFactor,
- topicProperties)
- } finally {
- zkClient.close
- }
-
- info("Created topic %s." format topicName)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case tee: TopicExistsException =>
- info("Topic %s already exists." format topicName)
- loop.done
- case e: Exception =>
- warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
- debug("Exception detail:", e)
- }
- }
- )
- }
-
- /**
- * Common code to validate partition count in a topic
- *
- * @param topicName Name of the topic to be validated
- * @param systemName Kafka system to use
- * @param metadataStore Topic Metadata store
- * @param expectedPartitionCount Expected number of partitions
- * @param failOnValidation If true - fail the job if the validation fails
- */
- def validateTopicPartitionCount(topicName: String,
- systemName: String,
- metadataStore: TopicMetadataStore,
- expectedPartitionCount: Int,
- failOnValidation: Boolean = true) {
- info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount))
- retryBackoff.run(
- loop => {
- val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
- val topicMetadata = topicMetadataMap(topicName)
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
- val partitionCount = topicMetadata.partitionsMetadata.length
- if (partitionCount != expectedPartitionCount)
- {
- val msg = "Validation failed for topic %s because partition count %s did not " +
- "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount)
- if (failOnValidation) {
- throw new KafkaUtilException(msg)
- } else {
- warn(msg + " Ignoring the failure.")
- }
- }
-
- info("Successfully validated topic %s." format topicName)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: KafkaUtilException => throw e
- case e: Exception =>
- warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
- debug("Exception detail:", e)
- }
- }
- )
- }
/**
* Code to verify that a topic exists
http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 43b912d..3337a36 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,6 +19,9 @@
package org.apache.samza.checkpoint.kafka
+
+import java.util.Properties
+
import _root_.kafka.admin.AdminUtils
import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
import _root_.kafka.integration.KafkaServerTestHarness
@@ -33,6 +36,7 @@ import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system._
+import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
@@ -71,6 +75,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
var systemConsumerFn: ()=>SystemConsumer = ()=>{null}
var systemProducerFn: ()=>SystemProducer = ()=>{null}
var systemAdminFn: ()=>SystemAdmin = ()=>{null}
+
+ val systemName = "kafka"
+ val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
+ val kafkaStreamSpec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
+ checkpointTopic, systemName, 1,
+ 1, new Properties())
@Before
override def setUp {
@@ -78,7 +88,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
- val systemName = "kafka"
val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
val config = new java.util.HashMap[String, String]()
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
@@ -102,6 +111,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())}
systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())}
systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)}
+
}
@After
@@ -153,7 +163,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val taskName = new TaskName(partition.toString)
kcm.register(taskName)
createCheckpointTopic()
- kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+ val systemAdmin = systemAdminFn()
+ systemAdmin.validateStream(kafkaStreamSpec)
// check that log compaction is enabled.
val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
@@ -194,7 +205,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val taskName = new TaskName(partition.toString)
kcm.register(taskName)
createCheckpointTopic()
- kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+ val systemAdmin = systemAdminFn()
+ systemAdmin.validateStream(kafkaStreamSpec)
+
// check that log compaction is enabled.
val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
@@ -234,7 +247,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val taskName = new TaskName(partition.toString)
kcm.register(taskName)
createCheckpointTopic(serdeCheckpointTopic)
- kcm.kafkaUtil.validateTopicPartitionCount(serdeCheckpointTopic, "kafka", metadataStore, 1)
+ val systemAdmin = systemAdminFn()
+ systemAdmin.validateStream(kafkaStreamSpec)
+
writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
// because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
try {
@@ -261,7 +276,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
kcm.start
fail("Expected a KafkaUtilException for invalid number of partitions in the topic.")
}catch {
- case e: KafkaUtilException => None
+ case e: StreamValidationException => None
}
kcm.stop