You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/04/14 19:09:12 UTC
samza git commit: SAMZA-1145: Provide Ability To Confgure The Default Number Of Changel…
Repository: samza
Updated Branches:
refs/heads/master 34178a63f -> 925866d9b
SAMZA-1145: Provide Ability To Confgure The Default Number Of Changel\u2026
\u2026og Replicas
Author: James Lent <jl...@nc.rr.com>
Reviewers: Yi Pan <ni...@apache.org>, Jagadish <vj...@gmail.com>
Closes #86 from jwlent55/SAMZA-1145
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/925866d9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/925866d9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/925866d9
Branch: refs/heads/master
Commit: 925866d9b0a21fe49cd643544d8cad83f1c124bf
Parents: 34178a6
Author: James Lent <jl...@nc.rr.com>
Authored: Fri Apr 14 12:09:07 2017 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Apr 14 12:09:07 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 15 +++++++++---
.../org/apache/samza/config/KafkaConfig.scala | 4 +++-
.../samza/system/kafka/KafkaSystemFactory.scala | 2 +-
.../apache/samza/config/TestKafkaConfig.scala | 24 ++++++++++++++++++++
4 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index ba04139..df59547 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1272,13 +1272,21 @@
<tr>
<td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
- <td class="default">2</td>
+ <td class="default">stores.default.changelog.replication.factor</td>
<td class="description">
The property defines the number of replicas to use for the change log stream.
</td>
</tr>
<tr>
+ <td class="property" id="store-default-changelog-replication-factor">stores.default.changelog.replication.factor</td>
+ <td class="default">2</td>
+ <td class="description">
+ This property defines the default number of replicas to use for the change log stream.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>kafka.topic-level-property</td>
<td class="default"></td>
<td class="description">
@@ -1343,8 +1351,9 @@
<td class="description">
This property defines a store, Samza's mechanism for efficient
<a href="../container/state-management.html">stateful stream processing</a>. You can give a
- store any <span class="store">store-name</span>, and use that name to get a reference to the
- store in your stream task (call
+ store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span>
+ <em>default</em> is reserved for defining default store parameters), and use that name to get a
+ reference to the store in your stream task (call
<a href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
in your task's
<a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config, org.apache.samza.task.TaskContext)">init()</a>
http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index ae6330f..a8c1f3a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -52,6 +52,7 @@ object KafkaConfig {
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
+ val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
@@ -132,7 +133,8 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
- def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
+ def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+ def getDefaultChangelogStreamReplicationFactor = getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse("2")
// The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
def getKafkaChangelogEnabledStores() = {
http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d0e3089..638806b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -118,7 +118,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
// Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
{
- val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt
+ val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt
val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
(topicName, changelogInfo)
http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 555ab9f..106a0d5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -192,4 +192,28 @@ class TestKafkaConfig {
val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
+
+ @Test
+ def testChangeLogReplicationFactor() {
+ props.setProperty("stores.store-with-override.changelog.replication.factor", "3")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "3")
+ assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "2")
+ assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "2")
+ }
+
+ @Test
+ def testChangeLogReplicationFactorWithOverriddenDefault() {
+ props.setProperty("stores.store-with-override.changelog.replication.factor", "4")
+ // Override the "default" default value
+ props.setProperty("stores.default.changelog.replication.factor", "5")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "4")
+ assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "5")
+ assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "5")
+ }
}