You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:44 UTC
[03/26] samza git commit: SAMZA-1387: Unable to Start Samza App
Because Regex Check
SAMZA-1387: Unable to Start Samza App Because Regex Check
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Fred Ji <fr...@yahoo.com>
Closes #266 from jmakes/samza-1387
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06702af8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06702af8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06702af8
Branch: refs/heads/0.14.0
Commit: 06702af8fda1d016ae55461c404b55b84b20ffd2
Parents: 8f7f567
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Aug 11 09:28:20 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Aug 11 09:28:20 2017 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaSystemAdmin.scala | 7 ++++---
.../system/kafka/TestKafkaSystemAdminJava.java | 19 +++++++++++++++++++
2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index af77d5b..1e59b61 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -38,8 +38,9 @@ import scala.collection.JavaConverters._
object KafkaSystemAdmin extends Logging {
// Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
- // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
+ // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387
val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+ val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id"
/**
* A helper method that takes oldest, newest, and upcoming offsets for each
@@ -331,7 +332,7 @@ class KafkaSystemAdmin(
override def createCoordinatorStream(streamName: String) {
info("Attempting to create coordinator stream %s." format streamName)
- val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+ val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
if (createStream(streamSpec)) {
info("Created coordinator stream %s." format streamName)
@@ -496,7 +497,7 @@ class KafkaSystemAdmin(
class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
def this(s: String) = this(s, null)
}
-
+
override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index ce59b40..33c4017 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -51,6 +51,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
@Test
+ public void testCreateCoordinatorStreamDelegatesToCreateStream_specialCharsInTopicName() {
+ final String STREAM = "test.Coord_inator.Stream";
+
+ SystemAdmin admin = Mockito.spy(createSystemAdmin());
+ StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM());
+ admin.createCoordinatorStream(STREAM);
+ admin.validateStream(spec);
+
+ ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
+ Mockito.verify(admin).createStream(specCaptor.capture());
+
+ StreamSpec internalSpec = specCaptor.getValue();
+ assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
+ assertEquals(KafkaSystemAdmin.COORDINATOR_STREAMID(), internalSpec.getId());
+ assertEquals(SYSTEM(), internalSpec.getSystemName());
+ assertEquals(STREAM, internalSpec.getPhysicalName());
+ }
+
+ @Test
public void testCreateChangelogStreamDelegatesToCreateStream() {
final String STREAM = "testChangeLogStream";
final int PARTITIONS = 12;