You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/15 02:24:34 UTC

samza git commit: SAMZA-1060 add new config job.changelog.system

Repository: samza
Updated Branches:
  refs/heads/master 1d458050c -> 97edf105e


SAMZA-1060 add new config job.changelog.system

SAMZA-1060.
Allow to specify a changelog system separately, so user can only specify stream name for each store.
If user specifies both (system and stream) it overwrites the job.changelog.system setting.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>

Reviewers: navina

Closes #31 from sborya/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/97edf105
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/97edf105
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/97edf105

Branch: refs/heads/master
Commit: 97edf105edec1d14091ca3eb1fc2449c28817834
Parents: 1d45805
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Authored: Wed Dec 14 18:24:10 2016 -0800
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Wed Dec 14 18:24:10 2016 -0800

----------------------------------------------------------------------
 .../org/apache/samza/config/StorageConfig.scala | 26 ++++++++++++-
 .../apache/samza/config/TestStorageConfig.scala | 40 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/97edf105/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index be3f106..a3587d0 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -19,6 +19,9 @@
 
 package org.apache.samza.config
 
+
+import org.apache.samza.SamzaException
+
 import scala.collection.JavaConversions._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -29,6 +32,7 @@ object StorageConfig {
   val KEY_SERDE = "stores.%s.key.serde"
   val MSG_SERDE = "stores.%s.msg.serde"
   val CHANGELOG_STREAM = "stores.%s.changelog"
+  val CHANGELOG_SYSTEM = "job.changelog.system"
 
   implicit def Config2Storage(config: Config) = new StorageConfig(config)
 }
@@ -38,7 +42,27 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
   def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name))
   def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
   def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
-  def getChangelogStream(name: String) = getOption(CHANGELOG_STREAM format name)
+  def getChangelogStream(name: String) = {
+    // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
+    // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
+    // these values will be combined into <asystem>.<astream>
+    val systemStream = getOption(CHANGELOG_STREAM format name)
+    val changelogSystem = getOption(CHANGELOG_SYSTEM)
+    val systemStreamRes =
+      if ( systemStream.isDefined  && ! systemStream.getOrElse("").contains('.')) {
+        // contains only stream name
+        if (changelogSystem.isDefined) {
+          Some(changelogSystem.get + "." + systemStream.get)
+        }
+        else {
+          throw new SamzaException("changelog system is not defined:" + systemStream.get)
+        }
+      } else {
+        systemStream
+      }
+    systemStreamRes
+  }
+
   def getStoreNames: Seq[String] = {
     val conf = config.subset("stores.", true)
     conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq

http://git-wip-us.apache.org/repos/asf/samza/blob/97edf105/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
index 81a35ec..8284b3a 100644
--- a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
@@ -19,22 +19,54 @@
 
 package org.apache.samza.config
 
+
+import org.junit.Test
+
 import scala.collection.JavaConversions._
 import org.apache.samza.config.StorageConfig._
 import org.junit.Assert.assertFalse
 import org.junit.Assert.assertTrue
-import org.junit.Test
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
 
 class TestStorageConfig {
   @Test
   def testIsChangelogSystem {
     val configMap = Map[String, String](
-      FACTORY.format("system1") -> "some.factory.Class",
-      CHANGELOG_STREAM.format("system1") -> "system1.stream1",
-      FACTORY.format("system2") -> "some.factory.Class")
+      FACTORY.format("store1") -> "some.factory.Class",
+      CHANGELOG_STREAM.format("store1") -> "system1.stream1",
+      FACTORY.format("store2") -> "some.factory.Class")
     val config = new MapConfig(configMap)
     assertFalse(config.isChangelogSystem("system3"))
     assertFalse(config.isChangelogSystem("system2"))
     assertTrue(config.isChangelogSystem("system1"))
   }
+
+  @Test
+  def testIsChangelogSystemSetting {
+    val configMap = Map[String, String](
+      FACTORY.format("store1") -> "some.factory.Class",
+      CHANGELOG_STREAM.format("store1") -> "system1.stream1",
+      CHANGELOG_SYSTEM -> "system2",
+      CHANGELOG_STREAM.format("store2") -> "stream2",
+      CHANGELOG_STREAM.format("store4") -> "stream4",
+      FACTORY.format("store2") -> "some.factory.Class")
+    val config = new MapConfig(configMap)
+    assertFalse(config.isChangelogSystem("system3"))
+    assertTrue(config.isChangelogSystem("system2"))
+    assertTrue(config.isChangelogSystem("system1"))
+
+    assertEquals("system1.stream1", config.getChangelogStream("store1").getOrElse(""));
+    assertEquals("system2.stream2", config.getChangelogStream("store2").getOrElse(""));
+
+    val configMapErr = Map[String, String](CHANGELOG_STREAM.format("store4")->"stream4")
+    val configErr = new MapConfig(configMapErr)
+
+    try {
+      configErr.getChangelogStream("store4").getOrElse("")
+      fail("store4 has no system defined. Should've failed.");
+    } catch {
+       case e: Exception => // do nothing, it is expected
+    }
+  }
 }
\ No newline at end of file