You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/06/18 20:45:25 UTC
git commit: SAMZA-296: Configuration cleanup
Repository: incubator-samza
Updated Branches:
refs/heads/master 64da867b2 -> f703f97d6
SAMZA-296: Configuration cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f703f97d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f703f97d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f703f97d
Branch: refs/heads/master
Commit: f703f97d6aa05597eb42a1be4d726bab6d6e3122
Parents: 64da867
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Wed Jun 18 10:07:57 2014 -0700
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Wed Jun 18 11:44:47 2014 -0700
----------------------------------------------------------------------
.../0.7.0/yarn/application-master.md | 2 +-
.../org/apache/samza/config/SystemConfig.scala | 2 +-
.../org/apache/samza/config/TaskConfig.scala | 3 --
.../org/apache/samza/config/KafkaConfig.scala | 8 +---
samza-test/src/main/resources/common.properties | 42 --------------------
.../org/apache/samza/config/YarnConfig.scala | 39 +++++-------------
.../yarn/TestSamzaAppMasterTaskManager.scala | 2 +-
7 files changed, 13 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/docs/learn/documentation/0.7.0/yarn/application-master.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/yarn/application-master.md b/docs/learn/documentation/0.7.0/yarn/application-master.md
index 30b14cd..6b81805 100644
--- a/docs/learn/documentation/0.7.0/yarn/application-master.md
+++ b/docs/learn/documentation/0.7.0/yarn/application-master.md
@@ -41,7 +41,7 @@ From this point on, the ApplicationMaster just reacts to events from the RM.
### Fault Tolerance
-Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.countainer.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details.
+Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.container.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details.
When the AM receives a reboot signal from YARN, it will throw a SamzaException. This will trigger a clean and successful shutdown of the AM (YARN won't think the AM failed).
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 5bb17c7..4cfdcc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -49,7 +49,7 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
*/
def getSystemNames() = {
val subConf = config.subset("systems.", true)
- // find all .samza.partition.manager keys, and strip the suffix
+ // find all .samza.factory keys, and strip the suffix
subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 3510f1f..18a9510 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -32,7 +32,6 @@ object TaskConfig {
val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
- val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task?
val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
implicit def Config2Task(config: Config) = new TaskConfig(config)
@@ -70,7 +69,5 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
- def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
-
def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index b95e493..bdb416d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -33,9 +33,6 @@ object KafkaConfig {
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
- val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
- val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
-
/**
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
@@ -51,13 +48,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
// custom consumer config
- def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
- def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
/**
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
- * are defined with this proeprty in thec onfig.
+ * are defined with this property in the config.
*/
def getFetchMessageMaxBytesTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
@@ -75,7 +70,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
*/
def getAutoOffsetResetTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
- // find all .samza.partition.manager keys, and strip the suffix
subConf
.filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
.map {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-test/src/main/resources/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/common.properties b/samza-test/src/main/resources/common.properties
deleted file mode 100644
index 6e0c061..0000000
--- a/samza-test/src/main/resources/common.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-job.factory.class=samza.job.local.LocalJobFactory
-
-task.checkpoint.factory=samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka-checkpoints
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=samza.serializers.StringSerdeFactory
-
-# Kafka System
-systems.kafka.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.samza.msg.serde=string
-
-# Checkpoints System
-systems.kafka-checkpoints.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka-checkpoints.serializer.class=samza.task.state.KafkaCheckpointEncoder
-systems.kafka-checkpoints.partitioner.class=samza.task.state.KafkaCheckpointPartitioner
-systems.kafka-checkpoints.key.serializer.class=kafka.serializer.NullEncoder
-systems.kafka-checkpoints.producer.metadata.broker.list=localhost:9092
-systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
-systems.kafka-checkpoints.producer.type=sync
-
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 5756d3a..0cda102 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -24,7 +24,7 @@ object YarnConfig {
val PACKAGE_PATH = "yarn.package.path"
val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"
val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
- val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count"
+ val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
val TASK_COUNT = "yarn.container.count"
val AM_JVM_OPTIONS = "yarn.am.opts"
@@ -36,44 +36,23 @@ object YarnConfig {
}
class YarnConfig(config: Config) extends ScalaMapConfig(config) {
- def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match {
- case Some(mem) => Some(mem.toInt)
- case _ => None
- }
+ def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB).map(_.toInt)
- def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match {
- case Some(cpu) => Some(cpu.toInt)
- case _ => None
- }
+ def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES).map(_.toInt)
- def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT) match {
- case Some(count) => Some(count.toInt)
- case _ => None
- }
+ def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT).map(_.toInt)
- def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match {
- case Some(retryWindowMs) => Some(retryWindowMs.toInt)
- case _ => None
- }
+ def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS).map(_.toInt)
def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
- def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match {
- case Some(tc) => Some(tc.toInt)
- case _ => None
- }
+ def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt)
def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
-
- def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match {
- case Some(mem) => Some(mem.toInt)
- case _ => None
- }
- def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS) match {
- case Some(interval) => Some(interval.toInt)
- case _ => None
- }
+ def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt)
+
+ def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS).map(_.toInt)
def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f703f97d/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 0442580..f1139f5 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -149,7 +149,7 @@ class TestSamzaAppMasterTaskManager {
"task.inputs" -> "test-system.test-stream",
"systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
"systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
- "yarn.countainer.retry.count" -> "1",
+ "yarn.container.retry.count" -> "1",
"yarn.container.retry.window.ms" -> "1999999999"))
@Test