You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/06/18 20:45:25 UTC

git commit: SAMZA-296: Configuration cleanup

Repository: incubator-samza
Updated Branches:
  refs/heads/master 64da867b2 -> f703f97d6


SAMZA-296: Configuration cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f703f97d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f703f97d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f703f97d

Branch: refs/heads/master
Commit: f703f97d6aa05597eb42a1be4d726bab6d6e3122
Parents: 64da867
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Wed Jun 18 10:07:57 2014 -0700
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Wed Jun 18 11:44:47 2014 -0700

----------------------------------------------------------------------
 .../0.7.0/yarn/application-master.md            |  2 +-
 .../org/apache/samza/config/SystemConfig.scala  |  2 +-
 .../org/apache/samza/config/TaskConfig.scala    |  3 --
 .../org/apache/samza/config/KafkaConfig.scala   |  8 +---
 samza-test/src/main/resources/common.properties | 42 --------------------
 .../org/apache/samza/config/YarnConfig.scala    | 39 +++++-------------
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  2 +-
 7 files changed, 13 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/docs/learn/documentation/0.7.0/yarn/application-master.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/yarn/application-master.md b/docs/learn/documentation/0.7.0/yarn/application-master.md
index 30b14cd..6b81805 100644
--- a/docs/learn/documentation/0.7.0/yarn/application-master.md
+++ b/docs/learn/documentation/0.7.0/yarn/application-master.md
@@ -41,7 +41,7 @@ From this point on, the ApplicationMaster just reacts to events from the RM.
 
 ### Fault Tolerance
 
-Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.countainer.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details.
+Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.container.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details.
 
 When the AM receives a reboot signal from YARN, it will throw a SamzaException. This will trigger a clean and successful shutdown of the AM (YARN won't think the AM failed).
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 5bb17c7..4cfdcc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -49,7 +49,7 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    */
   def getSystemNames() = {
     val subConf = config.subset("systems.", true)
-    // find all .samza.partition.manager keys, and strip the suffix
+    // find all .samza.factory keys, and strip the suffix
     subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 3510f1f..18a9510 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -32,7 +32,6 @@ object TaskConfig {
   val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
-  val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task?
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
 
   implicit def Config2Task(config: Config) = new TaskConfig(config)
@@ -70,7 +69,5 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
 
-  def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
-  
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index b95e493..bdb416d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -33,9 +33,6 @@ object KafkaConfig {
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
 
-  val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
-  val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
-
   /**
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
@@ -51,13 +48,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
 
   // custom consumer config
-  def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
-  def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 
   /**
    * Returns a map of topic -> fetch.message.max.bytes value for all streams that
-   * are defined with this proeprty in thec onfig.
+   * are defined with this property in the config.
    */
   def getFetchMessageMaxBytesTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
@@ -75,7 +70,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
    */
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
-    // find all .samza.partition.manager keys, and strip the suffix
     subConf
       .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
       .map {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-test/src/main/resources/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/common.properties b/samza-test/src/main/resources/common.properties
deleted file mode 100644
index 6e0c061..0000000
--- a/samza-test/src/main/resources/common.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-job.factory.class=samza.job.local.LocalJobFactory
-
-task.checkpoint.factory=samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka-checkpoints
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=samza.serializers.StringSerdeFactory
-
-# Kafka System
-systems.kafka.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.samza.msg.serde=string
-
-# Checkpoints System
-systems.kafka-checkpoints.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka-checkpoints.serializer.class=samza.task.state.KafkaCheckpointEncoder
-systems.kafka-checkpoints.partitioner.class=samza.task.state.KafkaCheckpointPartitioner
-systems.kafka-checkpoints.key.serializer.class=kafka.serializer.NullEncoder
-systems.kafka-checkpoints.producer.metadata.broker.list=localhost:9092
-systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
-systems.kafka-checkpoints.producer.type=sync
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 5756d3a..0cda102 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -24,7 +24,7 @@ object YarnConfig {
   val PACKAGE_PATH = "yarn.package.path"
   val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"
   val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
-  val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count"
+  val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
   val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
   val TASK_COUNT = "yarn.container.count"
   val AM_JVM_OPTIONS = "yarn.am.opts"
@@ -36,44 +36,23 @@ object YarnConfig {
 }
 
 class YarnConfig(config: Config) extends ScalaMapConfig(config) {
-  def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match {
-    case Some(mem) => Some(mem.toInt)
-    case _ => None
-  }
+  def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB).map(_.toInt)
 
-  def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match {
-    case Some(cpu) => Some(cpu.toInt)
-    case _ => None
-  }
+  def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES).map(_.toInt)
 
-  def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT) match {
-    case Some(count) => Some(count.toInt)
-    case _ => None
-  }
+  def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT).map(_.toInt)
 
-  def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match {
-    case Some(retryWindowMs) => Some(retryWindowMs.toInt)
-    case _ => None
-  }
+  def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS).map(_.toInt)
 
   def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
 
-  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match {
-    case Some(tc) => Some(tc.toInt)
-    case _ => None
-  }
+  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt)
 
   def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
-  
-  def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match {
-    case Some(mem) => Some(mem.toInt)
-    case _ => None
-  }
 
-  def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS) match {
-    case Some(interval) => Some(interval.toInt)
-    case _ => None
-  }
+  def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt)
+
+  def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS).map(_.toInt)
 
   def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 0442580..f1139f5 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -149,7 +149,7 @@ class TestSamzaAppMasterTaskManager {
     "task.inputs" -> "test-system.test-stream",
     "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
     "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
-    "yarn.countainer.retry.count" -> "1",
+    "yarn.container.retry.count" -> "1",
     "yarn.container.retry.window.ms" -> "1999999999"))
 
   @Test