You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:44 UTC

[03/26] samza git commit: SAMZA-1387: Unable to Start Samza App Because Regex Check

SAMZA-1387: Unable to Start Samza App Because Regex Check

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Fred Ji <fr...@yahoo.com>

Closes #266 from jmakes/samza-1387


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06702af8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06702af8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06702af8

Branch: refs/heads/0.14.0
Commit: 06702af8fda1d016ae55461c404b55b84b20ffd2
Parents: 8f7f567
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Aug 11 09:28:20 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Aug 11 09:28:20 2017 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaSystemAdmin.scala    |  7 ++++---
 .../system/kafka/TestKafkaSystemAdminJava.java   | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index af77d5b..1e59b61 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -38,8 +38,9 @@ import scala.collection.JavaConverters._
 
 object KafkaSystemAdmin extends Logging {
   // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
-  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
+  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387
   val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+  val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id"
 
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
@@ -331,7 +332,7 @@ class KafkaSystemAdmin(
   override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
 
-    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+    val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
 
     if (createStream(streamSpec)) {
       info("Created coordinator stream %s." format streamName)
@@ -496,7 +497,7 @@ class KafkaSystemAdmin(
   class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
-  
+
   override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
     val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
     val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)

http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index ce59b40..33c4017 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -51,6 +51,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   @Test
+  public void testCreateCoordinatorStreamDelegatesToCreateStream_specialCharsInTopicName() {
+    final String STREAM = "test.Coord_inator.Stream";
+
+    SystemAdmin admin = Mockito.spy(createSystemAdmin());
+    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM());
+    admin.createCoordinatorStream(STREAM);
+    admin.validateStream(spec);
+
+    ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
+    Mockito.verify(admin).createStream(specCaptor.capture());
+
+    StreamSpec internalSpec = specCaptor.getValue();
+    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+    assertEquals(KafkaSystemAdmin.COORDINATOR_STREAMID(), internalSpec.getId());
+    assertEquals(SYSTEM(), internalSpec.getSystemName());
+    assertEquals(STREAM, internalSpec.getPhysicalName());
+  }
+
+  @Test
   public void testCreateChangelogStreamDelegatesToCreateStream() {
     final String STREAM = "testChangeLogStream";
     final int PARTITIONS = 12;