You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/05 00:39:15 UTC
[1/2] samza git commit: SAMZA-1317: Changelog topic configuration
should accept streams.[streamId] configurations
Repository: samza
Updated Branches:
refs/heads/0.13.0 48b05c7d3 -> c74e03ca8
SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #212 from jmakes/samza-1317-migration-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c66c4877
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c66c4877
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c66c4877
Branch: refs/heads/0.13.0
Commit: c66c48770aacb65e3bf887e31c6efffb498cc9cf
Parents: 48b05c7
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 2 14:53:31 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Sun Jun 4 17:20:28 2017 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaSystemAdmin.scala | 8 +++-
.../system/kafka/TestKafkaSystemAdminJava.java | 46 ++++++++++++++++----
2 files changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c66c4877/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 8c90c6c..2f82754 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -35,6 +35,10 @@ import scala.collection.JavaConverters._
object KafkaSystemAdmin extends Logging {
+ // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
+ // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
+ val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+
/**
* A helper method that takes oldest, newest, and upcoming offsets for each
* system stream partition, and creates a single map from stream name to
@@ -490,10 +494,10 @@ class KafkaSystemAdmin(
class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
def this(s: String) = this(s, null)
}
-
+
override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
- val spec = new KafkaStreamSpec(topicName, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+ val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
if (createStream(spec)) {
info("Created changelog stream %s." format topicName)
http://git-wip-us.apache.org/repos/asf/samza/blob/c66c4877/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index f5bc73a..a47ba9d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,6 +19,9 @@
package org.apache.samza.system.kafka;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
@@ -27,13 +30,7 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@@ -67,7 +64,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
- StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS);
+ StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
+ admin.createChangelogStream(STREAM, PARTITIONS);
+ admin.validateStream(spec);
+
+ ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
+ Mockito.verify(admin).createStream(specCaptor.capture());
+
+ StreamSpec internalSpec = specCaptor.getValue();
+ assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
+ assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
+ assertEquals(SYSTEM(), internalSpec.getSystemName());
+ assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+ assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+ assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
+ }
+
+ @Test
+ public void testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
+ final String STREAM = "test.Change_Log.Stream";
+ final int PARTITIONS = 12;
+ final int REP_FACTOR = 3;
+
+ Properties coordProps = new Properties();
+ Properties changeLogProps = new Properties();
+ changeLogProps.setProperty("cleanup.policy", "compact");
+ changeLogProps.setProperty("segment.bytes", "139");
+ Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+ changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
+
+ SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
+ StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
admin.createChangelogStream(STREAM, PARTITIONS);
admin.validateStream(spec);
@@ -76,7 +104,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
StreamSpec internalSpec = specCaptor.getValue();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
- assertEquals(STREAM, internalSpec.getId());
+ assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
assertEquals(SYSTEM(), internalSpec.getSystemName());
assertEquals(STREAM, internalSpec.getPhysicalName());
assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
[2/2] samza git commit: SAMZA-1317: Changelog validation error for
topics with period in the
Posted by jm...@apache.org.
SAMZA-1317: Changelog validation error for topics with period in the
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #213 from jmakes/samza-1317-migration-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c74e03ca
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c74e03ca
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c74e03ca
Branch: refs/heads/0.13.0
Commit: c74e03ca83d9518ad037efaf8214fda424856670
Parents: c66c487
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 2 20:42:14 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Sun Jun 4 17:20:42 2017 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaStreamSpec.java | 7 +++++--
.../samza/system/kafka/KafkaSystemAdmin.scala | 4 +++-
.../system/kafka/TestKafkaSystemAdminJava.java | 19 +++++++++++++++++++
3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c74e03ca/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 0477854..c7e82f7 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -116,12 +116,15 @@ public class KafkaStreamSpec extends StreamSpec {
/**
* Convenience constructor to create a KafkaStreamSpec with just a topicName, systemName, and partitionCount.
*
+ * @param id The application-unique logical identifier for the stream. It is used to distinguish between
+ * streams in a Samza application so it must be unique in the context of one deployable unit.
+ * It does not need to be globally unique or unique with respect to a host.
* @param topicName The name of the topic.
* @param systemName The name of the System. See {@link org.apache.samza.system.SystemFactory}
* @param partitionCount The number of partitions.
*/
- public KafkaStreamSpec(String topicName, String systemName, int partitionCount) {
- this(topicName, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties());
+ public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount) {
+ this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties());
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/c74e03ca/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 2f82754..af77d5b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
import java.util
import java.util.{Properties, UUID}
+
import kafka.admin.AdminUtils
import kafka.api._
import kafka.common.TopicAndPartition
@@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadat
import org.apache.samza.system._
import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
import org.apache.samza.{Partition, SamzaException}
+
import scala.collection.JavaConverters._
@@ -514,7 +516,7 @@ class KafkaSystemAdmin(
* will auto-create a new topic.
*/
override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
- validateStream(new KafkaStreamSpec(topicName, systemName, numKafkaChangelogPartitions))
+ validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions))
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/c74e03ca/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index a47ba9d..ce59b40 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -132,6 +132,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
@Test
+ public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
+ final String STREAM = "test.Change_Log.Validate";
+ Properties coordProps = new Properties();
+ Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+ changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
+
+ KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
+ SystemAdmin admin = Mockito.spy(systemAdmin);
+ StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+
+ admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+ admin.validateStream(spec);
+ admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw
+
+ Mockito.verify(admin).createStream(Mockito.any());
+ Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
+ }
+
+ @Test
public void testCreateStream() {
SystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);