You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/16 21:59:33 UTC

kafka git commit: KAFKA-2032: validate consumer's partition-assignment config; reviewed by Jason Rosenberg, Sriharsha Chintalapani and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 83d17e5b2 -> fa03a7c6c


KAFKA-2032: validate consumer's partition-assignment config; reviewed by Jason Rosenberg, Sriharsha Chintalapani and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa03a7c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa03a7c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa03a7c6

Branch: refs/heads/trunk
Commit: fa03a7c6c48550f01176402139201af75d3836a2
Parents: 83d17e5
Author: Parth Brahmbhatt <pb...@hortonworks.com>
Authored: Thu Jul 16 12:59:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 16 12:59:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java |  3 ++-
 core/src/main/scala/kafka/consumer/ConsumerConfig.scala   | 10 ++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fa03a7c6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index daff34d..70377ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -173,7 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
                                         SESSION_TIMEOUT_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.STRING,
-                                        "blah",
+                                        "range",
+                                        in("range", "roundrobin"),
                                         Importance.MEDIUM,
                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa03a7c6/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 0199317..97a56ce 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -58,6 +58,7 @@ object ConsumerConfig extends Config {
     validateGroupId(config.groupId)
     validateAutoOffsetReset(config.autoOffsetReset)
     validateOffsetsStorage(config.offsetsStorage)
+    validatePartitionAssignmentStrategy(config.partitionAssignmentStrategy)
   }
 
   def validateClientId(clientId: String) {
@@ -85,6 +86,15 @@ object ConsumerConfig extends Config {
                                                  "Valid values are 'zookeeper' and 'kafka'")
     }
   }
+
+  def validatePartitionAssignmentStrategy(strategy: String) {
+    strategy match {
+      case "range" =>
+      case "roundrobin" =>
+      case _ => throw new InvalidConfigException("Wrong value " + strategy + " of partition.assignment.strategy in consumer config; " +
+        "Valid values are 'range' and 'roundrobin'")
+    }
+  }
 }
 
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {