You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/02/18 22:23:55 UTC

samza git commit: SAMZA-554: Simplify serde configuration by providing default serde names

Repository: samza
Updated Branches:
  refs/heads/master 41c74b968 -> 9ea3a526a


SAMZA-554: Simplify serde configuration by providing default serde names


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9ea3a526
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9ea3a526
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9ea3a526

Branch: refs/heads/master
Commit: 9ea3a526a9de0b4bc74f476bd6d565cd192da3d0
Parents: 41c74b9
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 13:23:17 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 13:23:17 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 27 ++++++++++++++++++--
 .../samza/container/TestSamzaContainer.scala    | 24 +++++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac6e24f..2fc6c65 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -60,6 +60,7 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -107,6 +108,28 @@ object SamzaContainer extends Logging {
       .readValue(Util.read(new URL(url)), classOf[JobModel])
   }
 
+  /**
+   * A helper function which returns system's default serde according to the
+   * serde name. If not found, throw exception.
+   */
+  def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: String, config: Config) = {
+    info("looking for default serdes")
+    def getSerde(serdeFactory: String) = {
+      Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, config)
+    }
+    val serde = serdeName match {
+      case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
+      case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
+      case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
+      case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)
+      case "serializable" => getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName)
+      case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName)
+      case _ => throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, exceptionSystemName))
+    }
+    info("use default serde %s for %s" format (serde, serdeName))
+    serde
+  }
+
   def apply(containerModel: ContainerModel, config: Config) = {
     val containerId = containerModel.getContainerId
     val containerName = "samza-container-%s" format containerId
@@ -222,7 +245,7 @@ object SamzaContainer extends Logging {
         .filter(getSerdeName(_).isDefined)
         .map(systemName => {
           val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+          val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemName, config))
           (systemName, serde)
         }).toMap
     }
@@ -235,7 +258,7 @@ object SamzaContainer extends Logging {
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+          val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemStream.toString, config))
           (systemStream, serde)
         }).toMap
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 19ceeaa..81742bc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -52,6 +52,8 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
 import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
@@ -168,4 +170,26 @@ class TestSamzaContainer extends AssertionsForJUnit {
     t.join
     assertTrue(caughtException)
   }
+
+  @Test
+  def testDefaultSerdesFromSerdeName {
+    import SamzaContainer._
+    val config = new MapConfig
+    assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
+    assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
+    assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+    assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
+    assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
+    assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
+
+    // throw SamzaException if can not find the correct serde
+    var throwSamzaException = false
+    try {
+      defaultSerdesFromSerdeName("otherName", "testSystemException", config)
+    } catch {
+      case e: SamzaException => throwSamzaException = true
+      case _: Exception =>
+    }
+    assertTrue(throwSamzaException)
+  }
 }