You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/01 20:52:26 UTC
samza git commit: SAMZA-1214: Bug - system-scoped stream default
configs may not be honored
Repository: samza
Updated Branches:
refs/heads/master a1e03af0d -> 20b427cc8
SAMZA-1214: Bug - system-scoped stream default configs may not be honored
* Re-introduced deprecated system-stream configs into config table
* Fixed position of task.consumer.batch.size in config table
* Moved system-scoped defaults from StreamConfig to SystemConfig
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #150 from jmakes/samza-1214
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/20b427cc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/20b427cc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/20b427cc
Branch: refs/heads/master
Commit: 20b427cc88c5f8d2a6b2e0283ce18b4b5f01cc17
Parents: a1e03af
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon May 1 13:52:13 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon May 1 13:52:13 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 93 ++++++++++++++++++--
.../apache/samza/config/JavaStorageConfig.java | 2 +
.../apache/samza/config/JavaSystemConfig.java | 3 +
.../org/apache/samza/config/StreamConfig.scala | 7 +-
.../org/apache/samza/config/SystemConfig.scala | 16 +++-
.../apache/samza/config/TestStreamConfig.java | 3 +-
6 files changed, 105 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 0fc30c5..afa42f5 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -689,6 +689,17 @@
</tr>
<tr>
+ <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
+ <td class="default">1</td>
+ <td class="description">
+ If set to a positive integer, the task will try to consume
+ <a href="../container/streams.html#batching">batches</a> with the given number of messages
+ from each input stream, rather than consuming round-robin from all the input streams on
+ each individual message. Setting this property can improve performance in some cases.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="systems">Systems</th>
</tr>
@@ -785,13 +796,77 @@
</tr>
<tr>
- <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
- <td class="default">1</td>
+ <td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
+ <td class="default" rowspan="2"></td>
<td class="description">
- If set to a positive integer, the task will try to consume
- <a href="../container/streams.html#batching">batches</a> with the given number of messages
- from each input stream, rather than consuming round-robin from all the input streams on
- each individual message. Setting this property can improve performance in some cases.
+ This is deprecated in favor of <a href="#systems-samza-key-serde" class="property">
+ systems.<span class="system">system-name</span>.default.stream.samza.key.serde</a>.
+ </td>
+ </tr>
+ <tr>
+ <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
+ <td class="description">
+ This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
+ streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-msg-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
+ <td class="default" rowspan="2"></td>
+ <td class="description">
+ This is deprecated in favor of <a href="#systems-samza-msg-serde" class="property">
+ systems.<span class="system">system-name</span>.default.stream.samza.msg.serde</a>.
+ </td>
+ </tr>
+ <tr>
+ <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
+ <td class="description">
+ This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
+ streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-offset-default-legacy">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
+ <td class="default" rowspan="2">upcoming</td>
+ <td class="description">
+ This is deprecated in favor of <a href="#systems-samza-offset-default" class="property">
+ systems.<span class="system">system-name</span>.default.stream.samza.offset.default</a>.
+ </td>
+ </tr>
+ <tr>
+ <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
+ <td class="description">
+ This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
+ streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-streams-samza-reset-offset-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td>
+ <td>false</td>
+ <td>
+ This is deprecated in favor of <a href="#streams-samza-reset-offset" class="property">
+ streams.<span class="stream">stream-id</span>.samza.reset.offset</a>.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-streams-samza-priority-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td>
+ <td>-1</td>
+ <td>
+ This is deprecated in favor of <a href="#streams-samza-priority" class="property">
+ streams.<span class="stream">stream-id</span>.samza.priority</a>.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-streams-samza-bootstrap-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td>
+ <td>false</td>
+ <td>
+ This is deprecated in favor of <a href="#streams-samza-bootstrap" class="property">
+ streams.<span class="stream">stream-id</span>.samza.bootstrap</a>.
</td>
</tr>
@@ -875,7 +950,7 @@
</tr>
<tr>
- <td class="property" id="streams-streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
+ <td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
<td class="default">false</td>
<td class="description">
If set to <code>true</code>, when a Samza container starts up, it ignores any
@@ -888,7 +963,7 @@
</tr>
<tr>
- <td class="property" id="streams-streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
+ <td class="property" id="streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
<td class="default">-1</td>
<td class="description">
If one or more streams have a priority set (any positive integer), they will be processed
@@ -901,7 +976,7 @@
</tr>
<tr>
- <td class="property" id="streams-streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
+ <td class="property" id="streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
<td class="default">false</td>
<td class="description">
If set to <code>true</code>, this stream will be processed as a
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index a1f0ec0..c26601c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -98,6 +98,8 @@ public class JavaStorageConfig extends MapConfig {
* stores.storeName.changelog=streamName
*
* If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used.
+ *
+ * @return the name of the system to use by default for all changelogs, if defined.
*/
public String getChangelogSystem() {
return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
index 6408438..350f20c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -103,6 +103,9 @@ public class JavaSystemConfig extends MapConfig {
/**
* Gets the system-wide defaults for streams.
+ *
+ * @param systemName the name of the system for which the defaults will be returned.
+ * @return a subset of the config with the system prefix removed.
*/
public Config getDefaultStreamProperties(String systemName) {
return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 43cc9a9..c490642 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -81,14 +81,11 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
* Returns a list of all SystemStreams that have a serde defined from the config file.
*/
def getSerdeStreams(systemName: String) = {
- val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName)
- val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE)
-
val subConf = config.subset("systems.%s.streams." format systemName, true)
val legacySystemStreams = subConf
.asScala
.keys
- .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
+ .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
.map(k => {
val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
new SystemStream(systemName, streamName)
@@ -97,7 +94,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
.asScala
.keys
- .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
+ .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
.map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
.filter(streamId => systemName.equals(getSystem(streamId)))
.map(streamId => streamIdToSystemStream(streamId)).toSet
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 69fc383..804955c 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -29,8 +29,6 @@ object SystemConfig {
// system config constants
val SYSTEM_PREFIX = "systems.%s."
val SYSTEM_FACTORY = "systems.%s.samza.factory"
- val KEY_SERDE = "systems.%s.samza.key.serde"
- val MSG_SERDE = "systems.%s.samza.msg.serde"
val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
implicit def Config2System(config: Config) = new SystemConfig(config)
@@ -39,9 +37,9 @@ object SystemConfig {
class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
- def getSystemKeySerde(name: String) = getNonEmptyOption(SystemConfig.KEY_SERDE format name)
+ def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
- def getSystemMsgSerde(name: String) = getNonEmptyOption(SystemConfig.MSG_SERDE format name)
+ def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE)
def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
@@ -54,4 +52,14 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
// find all .samza.factory keys, and strip the suffix
subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
}
+
+ private def getSystemDefaultStreamProperty(name: String, property: String) = {
+ val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name)
+ val streamDefault = defaultStreamProperties.get(property)
+ if (!(streamDefault == null || streamDefault.isEmpty)) {
+ Option(streamDefault)
+ } else {
+ getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
index 4580cc4..f6a617c 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
@@ -216,7 +216,8 @@ public class TestStreamConfig {
// Ensure that we can set legacy system properties via the new system wide default
assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
- assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+ assertEquals(0, config.getSerdeStreams(STREAM1_SYSTEM).size());
+ assertEquals("value1", new SystemConfig(config).getSystemKeySerde(STREAM1_SYSTEM).get());
assertEquals("newest", config.getDefaultStreamOffset(SYSTEM_STREAM_1).get());
// Property set via systems.x.default.stream.* only