You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/16 21:59:33 UTC
kafka git commit: KAFKA-2032: validate consumer's
partition-assignment config;
reviewed by Jason Rosenberg, Sriharsha Chintalapani and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 83d17e5b2 -> fa03a7c6c
KAFKA-2032: validate consumer's partition-assignment config; reviewed by Jason Rosenberg, Sriharsha Chintalapani and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa03a7c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa03a7c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa03a7c6
Branch: refs/heads/trunk
Commit: fa03a7c6c48550f01176402139201af75d3836a2
Parents: 83d17e5
Author: Parth Brahmbhatt <pb...@hortonworks.com>
Authored: Thu Jul 16 12:59:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 16 12:59:27 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 3 ++-
core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 10 ++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa03a7c6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index daff34d..70377ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -173,7 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
SESSION_TIMEOUT_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.STRING,
- "blah",
+ "range",
+ in("range", "roundrobin"),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa03a7c6/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 0199317..97a56ce 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -58,6 +58,7 @@ object ConsumerConfig extends Config {
validateGroupId(config.groupId)
validateAutoOffsetReset(config.autoOffsetReset)
validateOffsetsStorage(config.offsetsStorage)
+ validatePartitionAssignmentStrategy(config.partitionAssignmentStrategy)
}
def validateClientId(clientId: String) {
@@ -85,6 +86,15 @@ object ConsumerConfig extends Config {
"Valid values are 'zookeeper' and 'kafka'")
}
}
+
+ def validatePartitionAssignmentStrategy(strategy: String) {
+ strategy match {
+ case "range" =>
+ case "roundrobin" =>
+ case _ => throw new InvalidConfigException("Wrong value " + strategy + " of partition.assignment.strategy in consumer config; " +
+ "Valid values are 'range' and 'roundrobin'")
+ }
+ }
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {