You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/05 22:36:41 UTC

samza git commit: SAMZA-1442: use systemAdmin.validateStream in KafkaCheckpointManager

Repository: samza
Updated Branches:
  refs/heads/master 7f1ec6492 -> b989e51ae


SAMZA-1442: use systemAdmin.validateStream in KafkaCheckpointManager

Author: Boris S <bo...@apache.org>

Reviewers: Jacob Maes <ja...@gmail.com>

Closes #314 from sborya/CheckpointTopicValidation


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b989e51a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b989e51a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b989e51a

Branch: refs/heads/master
Commit: b989e51ae866fee9cdba76a457992afc70695fdb
Parents: 7f1ec64
Author: Boris S <bo...@apache.org>
Authored: Thu Oct 5 15:36:37 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Thu Oct 5 15:36:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/StreamSpec.java     |  4 +
 .../kafka/KafkaCheckpointManager.scala          | 23 ++++-
 .../scala/org/apache/samza/util/KafkaUtil.scala | 88 --------------------
 .../kafka/TestKafkaCheckpointManager.scala      | 25 ++++--
 4 files changed, 45 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 6ea1a22..523ff68 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -81,6 +81,10 @@ public class StreamSpec {
    */
   private final Map<String, String> config;
 
+  @Override
+  public String toString() {
+    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+  }
   /**
    *  @param id           The application-unique logical identifier for the stream. It is used to distinguish between
    *                      streams in a Samza application so it must be unique in the context of one deployable unit.

http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1e22763..4eb6666 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -29,6 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
 import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
 import org.apache.samza.util._
 import org.apache.samza.{Partition, SamzaException}
@@ -252,8 +253,26 @@ class KafkaCheckpointManager(
   }
 
   override def start {
-    kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties)
-    kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation)
+    val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
+    val spec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
+                                   checkpointTopic, systemName, 1,
+                                   replicationFactor, checkpointTopicProperties)
+
+    info("About to create checkpoint stream: " + spec)
+    systemAdmin.createStream(spec)
+    info("Created checkpoint stream: " + spec)
+    try {
+      systemAdmin.validateStream(spec) // SPECIAL VALIDATION FOR CHECKPOINT. DO NOT FAIL IF failOnCheckpointValidation IS FALSE
+      info("Validated spec: " + spec)
+    } catch {
+      case e : StreamValidationException =>
+             if (failOnCheckpointValidation) {
+               throw e
+             } else {
+               warn("Checkpoint stream validation partially failed. Ignoring it because failOnCheckpointValidation=" + failOnCheckpointValidation)
+             }
+      case e1 : Exception => throw e1
+    }
   }
 
   override def register(taskName: TaskName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 39edba7..1410cbb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -91,94 +91,6 @@ object KafkaUtil extends Logging {
 
 class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                 val connectZk: () => ZkUtils) extends Logging {
-  /**
-   * Common code for creating a topic in Kafka
-   *
-   * @param topicName Name of the topic to be created
-   * @param partitionCount  Number of partitions in the topic
-   * @param replicationFactor Number of replicas for the topic
-   * @param topicProperties Any topic related properties
-   */
-  def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) {
-    info("Attempting to create topic %s." format topicName)
-    retryBackoff.run(
-      loop => {
-        val zkClient = connectZk()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            topicName,
-            partitionCount,
-            replicationFactor,
-            topicProperties)
-        } finally {
-          zkClient.close
-        }
-
-        info("Created topic %s." format topicName)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case tee: TopicExistsException =>
-            info("Topic %s already exists." format topicName)
-            loop.done
-          case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  /**
-   * Common code to validate partition count in a topic
-   *
-   * @param topicName Name of the topic to be validated
-   * @param systemName  Kafka system to use
-   * @param metadataStore Topic Metadata store
-   * @param expectedPartitionCount  Expected number of partitions
-   * @param failOnValidation If true - fail the job if the validation fails
-   */
-  def validateTopicPartitionCount(topicName: String,
-                                  systemName: String,
-                                  metadataStore: TopicMetadataStore,
-                                  expectedPartitionCount: Int,
-                                  failOnValidation: Boolean = true) {
-    info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount))
-    retryBackoff.run(
-      loop => {
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(topicName)
-        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
-        val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount != expectedPartitionCount)
-        {
-          val msg = "Validation failed for topic %s because partition count %s did not " +
-                  "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount)
-          if (failOnValidation) {
-            throw new KafkaUtilException(msg)
-          } else {
-            warn(msg + " Ignoring the failure.")
-          }
-        }
-
-        info("Successfully validated topic %s." format topicName)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: KafkaUtilException => throw e
-          case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
 
   /**
    * Code to verify that a topic exists

http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 43b912d..3337a36 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,6 +19,9 @@
 
 package org.apache.samza.checkpoint.kafka
 
+
+import java.util.Properties
+
 import _root_.kafka.admin.AdminUtils
 import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
 import _root_.kafka.integration.KafkaServerTestHarness
@@ -33,6 +36,7 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system._
+import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore}
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
@@ -71,6 +75,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
   var systemConsumerFn: ()=>SystemConsumer = ()=>{null}
   var systemProducerFn: ()=>SystemProducer = ()=>{null}
   var systemAdminFn: ()=>SystemAdmin = ()=>{null}
+  
+  val systemName = "kafka"
+  val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
+  val kafkaStreamSpec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
+                                 checkpointTopic, systemName, 1,
+                                 1, new Properties())
 
   @Before
   override def setUp {
@@ -78,7 +88,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
     TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
 
-    val systemName = "kafka"
     val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
     val config = new java.util.HashMap[String, String]()
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
@@ -102,6 +111,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())}
     systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())}
     systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)}
+    
   }
 
   @After
@@ -153,7 +163,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val taskName = new TaskName(partition.toString)
     kcm.register(taskName)
     createCheckpointTopic()
-    kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+    val systemAdmin = systemAdminFn()
+    systemAdmin.validateStream(kafkaStreamSpec)
 
     // check that log compaction is enabled.
     val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
@@ -194,7 +205,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val taskName = new TaskName(partition.toString)
     kcm.register(taskName)
     createCheckpointTopic()
-    kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
+    val systemAdmin = systemAdminFn()
+    systemAdmin.validateStream(kafkaStreamSpec)
+
 
     // check that log compaction is enabled.
     val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
@@ -234,7 +247,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       val taskName = new TaskName(partition.toString)
       kcm.register(taskName)
       createCheckpointTopic(serdeCheckpointTopic)
-      kcm.kafkaUtil.validateTopicPartitionCount(serdeCheckpointTopic, "kafka", metadataStore, 1)
+      val systemAdmin = systemAdminFn()
+      systemAdmin.validateStream(kafkaStreamSpec)
+
       writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
       // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
       try {
@@ -261,7 +276,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       kcm.start
       fail("Expected a KafkaUtilException for invalid number of partitions in the topic.")
     }catch {
-      case e: KafkaUtilException => None
+      case e: StreamValidationException => None
     }
     kcm.stop