You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/02/18 22:23:55 UTC
samza git commit: SAMZA-554: Simplify serde configuration by
providing default serde names
Repository: samza
Updated Branches:
refs/heads/master 41c74b968 -> 9ea3a526a
SAMZA-554: Simplify serde configuration by providing default serde names
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9ea3a526
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9ea3a526
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9ea3a526
Branch: refs/heads/master
Commit: 9ea3a526a9de0b4bc74f476bd6d565cd192da3d0
Parents: 41c74b9
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 13:23:17 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 13:23:17 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 27 ++++++++++++++++++--
.../samza/container/TestSamzaContainer.scala | 24 +++++++++++++++++
2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac6e24f..2fc6c65 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -60,6 +60,7 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -107,6 +108,28 @@ object SamzaContainer extends Logging {
.readValue(Util.read(new URL(url)), classOf[JobModel])
}
+ /**
+ * A helper function which returns system's default serde according to the
+ * serde name. If not found, throw exception.
+ */
+ def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: String, config: Config) = {
+ info("looking for default serdes")
+ def getSerde(serdeFactory: String) = {
+ Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, config)
+ }
+ val serde = serdeName match {
+ case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
+ case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
+ case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
+ case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)
+ case "serializable" => getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName)
+ case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName)
+ case _ => throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, exceptionSystemName))
+ }
+ info("use default serde %s for %s" format (serde, serdeName))
+ serde
+ }
+
def apply(containerModel: ContainerModel, config: Config) = {
val containerId = containerModel.getContainerId
val containerName = "samza-container-%s" format containerId
@@ -222,7 +245,7 @@ object SamzaContainer extends Logging {
.filter(getSerdeName(_).isDefined)
.map(systemName => {
val serdeName = getSerdeName(systemName).get
- val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+ val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemName, config))
(systemName, serde)
}).toMap
}
@@ -235,7 +258,7 @@ object SamzaContainer extends Logging {
.filter(systemStream => getSerdeName(systemStream).isDefined)
.map(systemStream => {
val serdeName = getSerdeName(systemStream).get
- val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+ val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemStream.toString, config))
(systemStream, serde)
}).toMap
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 19ceeaa..81742bc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -52,6 +52,8 @@ import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit
import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
class TestSamzaContainer extends AssertionsForJUnit {
@Test
@@ -168,4 +170,26 @@ class TestSamzaContainer extends AssertionsForJUnit {
t.join
assertTrue(caughtException)
}
+
+ @Test
+ def testDefaultSerdesFromSerdeName {
+ import SamzaContainer._
+ val config = new MapConfig
+ assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
+ assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
+ assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+ assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
+ assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
+ assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
+
+ // throw SamzaException if can not find the correct serde
+ var throwSamzaException = false
+ try {
+ defaultSerdesFromSerdeName("otherName", "testSystemException", config)
+ } catch {
+ case e: SamzaException => throwSamzaException = true
+ case _: Exception =>
+ }
+ assertTrue(throwSamzaException)
+ }
}