You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/15 02:24:34 UTC
samza git commit: SAMZA-1060 add new config job.changelog.system
Repository: samza
Updated Branches:
refs/heads/master 1d458050c -> 97edf105e
SAMZA-1060 add new config job.changelog.system
SAMZA-1060.
Allow to specify a changelog system separately, so user can only specify stream name for each store.
If user specifies both (system and stream) it overwrites the job.changelog.system setting.
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Reviewers: navina
Closes #31 from sborya/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/97edf105
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/97edf105
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/97edf105
Branch: refs/heads/master
Commit: 97edf105edec1d14091ca3eb1fc2449c28817834
Parents: 1d45805
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Authored: Wed Dec 14 18:24:10 2016 -0800
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Wed Dec 14 18:24:10 2016 -0800
----------------------------------------------------------------------
.../org/apache/samza/config/StorageConfig.scala | 26 ++++++++++++-
.../apache/samza/config/TestStorageConfig.scala | 40 ++++++++++++++++++--
2 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/97edf105/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index be3f106..a3587d0 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -19,6 +19,9 @@
package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+
import scala.collection.JavaConversions._
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
@@ -29,6 +32,7 @@ object StorageConfig {
val KEY_SERDE = "stores.%s.key.serde"
val MSG_SERDE = "stores.%s.msg.serde"
val CHANGELOG_STREAM = "stores.%s.changelog"
+ val CHANGELOG_SYSTEM = "job.changelog.system"
implicit def Config2Storage(config: Config) = new StorageConfig(config)
}
@@ -38,7 +42,27 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name))
def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
- def getChangelogStream(name: String) = getOption(CHANGELOG_STREAM format name)
+ def getChangelogStream(name: String) = {
+ // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
+ // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
+ // these values will be combined into <asystem>.<astream>
+ val systemStream = getOption(CHANGELOG_STREAM format name)
+ val changelogSystem = getOption(CHANGELOG_SYSTEM)
+ val systemStreamRes =
+ if ( systemStream.isDefined && ! systemStream.getOrElse("").contains('.')) {
+ // contains only stream name
+ if (changelogSystem.isDefined) {
+ Some(changelogSystem.get + "." + systemStream.get)
+ }
+ else {
+ throw new SamzaException("changelog system is not defined:" + systemStream.get)
+ }
+ } else {
+ systemStream
+ }
+ systemStreamRes
+ }
+
def getStoreNames: Seq[String] = {
val conf = config.subset("stores.", true)
conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
http://git-wip-us.apache.org/repos/asf/samza/blob/97edf105/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
index 81a35ec..8284b3a 100644
--- a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
@@ -19,22 +19,54 @@
package org.apache.samza.config
+
+import org.junit.Test
+
import scala.collection.JavaConversions._
import org.apache.samza.config.StorageConfig._
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
-import org.junit.Test
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
class TestStorageConfig {
@Test
def testIsChangelogSystem {
val configMap = Map[String, String](
- FACTORY.format("system1") -> "some.factory.Class",
- CHANGELOG_STREAM.format("system1") -> "system1.stream1",
- FACTORY.format("system2") -> "some.factory.Class")
+ FACTORY.format("store1") -> "some.factory.Class",
+ CHANGELOG_STREAM.format("store1") -> "system1.stream1",
+ FACTORY.format("store2") -> "some.factory.Class")
val config = new MapConfig(configMap)
assertFalse(config.isChangelogSystem("system3"))
assertFalse(config.isChangelogSystem("system2"))
assertTrue(config.isChangelogSystem("system1"))
}
+
+ @Test
+ def testIsChangelogSystemSetting {
+ val configMap = Map[String, String](
+ FACTORY.format("store1") -> "some.factory.Class",
+ CHANGELOG_STREAM.format("store1") -> "system1.stream1",
+ CHANGELOG_SYSTEM -> "system2",
+ CHANGELOG_STREAM.format("store2") -> "stream2",
+ CHANGELOG_STREAM.format("store4") -> "stream4",
+ FACTORY.format("store2") -> "some.factory.Class")
+ val config = new MapConfig(configMap)
+ assertFalse(config.isChangelogSystem("system3"))
+ assertTrue(config.isChangelogSystem("system2"))
+ assertTrue(config.isChangelogSystem("system1"))
+
+ assertEquals("system1.stream1", config.getChangelogStream("store1").getOrElse(""));
+ assertEquals("system2.stream2", config.getChangelogStream("store2").getOrElse(""));
+
+ val configMapErr = Map[String, String](CHANGELOG_STREAM.format("store4")->"stream4")
+ val configErr = new MapConfig(configMapErr)
+
+ try {
+ configErr.getChangelogStream("store4").getOrElse("")
+ fail("store4 has no system defined. Should've failed.");
+ } catch {
+ case e: Exception => // do nothing, it is expected
+ }
+ }
}
\ No newline at end of file