You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/24 08:11:20 UTC

git commit: FLUME-2470. Kafka Sink and Source must use camel case for all configs.

Repository: flume
Updated Branches:
  refs/heads/trunk 186a3b808 -> bde2c2821


FLUME-2470. Kafka Sink and Source must use camel case for all configs.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bde2c282
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bde2c282
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bde2c282

Branch: refs/heads/trunk
Commit: bde2c28211a2d05a9930f1599cb15864ad3cdba0
Parents: 186a3b8
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Sep 23 23:10:25 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Sep 23 23:10:25 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 67 ++++++++-------
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 65 ++++++---------
 .../flume/sink/kafka/KafkaSinkConstants.java    | 20 ++++-
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 23 ++----
 .../src/test/resources/kafka-server.properties  |  1 +
 .../src/test/resources/log4j.properties         |  2 +-
 .../apache/flume/source/kafka/KafkaSource.java  | 40 ++++++---
 .../source/kafka/KafkaSourceConstants.java      | 15 ++--
 .../flume/source/kafka/KafkaSourceUtil.java     | 87 +++++++++++++++-----
 .../flume/source/kafka/KafkaSourceTest.java     | 12 ++-
 .../flume/source/kafka/KafkaSourceUtilTest.java | 43 +++++++---
 11 files changed, 232 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 11c1ad7..ce52946 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1101,29 +1101,33 @@ Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
 If you have multiple Kafka sources running, you can configure them with the same Consumer Group
 so each will read a unique set of partitions for the topic.
 
-The properties below are required properties, but you can specify any Kafka parameter you want
-and it will be passed to the consumer. Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>`_
-for details
-
-===========================     ===========  ===================================================
-Property Name                   Default      Description
-===========================     ===========  ===================================================
-**channels**                    --
-**type**                        --           The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
-**kafka.zookeeper.connect**     --           URI of ZooKeeper used by Kafka cluster
-**kadka.group.id**              --           Unique identified of consumer group. Setting the same id in multiple sources or agents
-                                             indicates that they are part of the same consumer group
-**topic**                       --           Kafka topic we'll read messages from. At the time, this is a single topic only.
-batchSize                       1000         Maximum number of messages written to Channel in one batch
-batchDurationMillis             1000         Maximum time (in ms) before a batch will be written to Channel
-                                             The batch will be written whenever the first of size and time will be reached.
-kafka.auto.commit.enable        false        If true, Kafka will commit events automatically - faster but less durable option.
-                                             when false, the Kafka Source will commit events before writing batch to channel
-consumer.timeout.ms             10           Polling interval for new data for batch.
-                                             Low value means more CPU usage.
-                                             High value means the maxBatchDurationMillis may be missed while waiting for
-                                             additional data.
-===========================     ===========  ===================================================
+
+
+===============================  ===========  ===================================================
+Property Name                    Default      Description
+===============================  ===========  ===================================================
+**channels**                     --
+**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
+**zookeeperConnect**             --           URI of ZooKeeper used by Kafka cluster
+**groupId**                      flume        Unique identified of consumer group. Setting the same id in multiple sources or agents
+                                              indicates that they are part of the same consumer group
+**topic**                        --           Kafka topic we'll read messages from. At the time, this is a single topic only.
+batchSize                        1000         Maximum number of messages written to Channel in one batch
+batchDurationMillis              1000         Maximum time (in ms) before a batch will be written to Channel
+                                              The batch will be written whenever the first of size and time will be reached.
+Other Kafka Consumer Properties  --           These properties are used to configure the Kafka Consumer. Any producer property supported
+                                              by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
+                                              For example: kafka.consumer.timeout.ms
+                                              Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
+===============================  ===========  ===================================================
+
+.. note:: The Kafka Source overrides two Kafka consumer parameters:
+          auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance
+          this can be set to "true", however, this can lead to loss of data
+          consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
+          setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means
+          higher latency in writing batches to channel (since we'll wait longer for data to arrive).
+
 
 Example for agent named tier1:
 
@@ -1131,9 +1135,9 @@ Example for agent named tier1:
 
     tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
     tier1.sources.source1.channels = channel1
-    tier1.sources.source1.kafka.zookeeper.connect = localhost:2181
+    tier1.sources.source1.zookeeperConnect = localhost:2181
     tier1.sources.source1.topic = test1
-    tier1.sources.source1.kafka.group.id = flume
+    tier1.sources.source1.groupId = flume
     tier1.sources.source1.kafka.consumer.timeout.ms = 100
 
 
@@ -2152,7 +2156,7 @@ Required properties are marked in bold font.
 Property Name                    Default              Description
 ===============================  ===================  =============================================================================================
 **type**                         --                   Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
-**kafka.metadata.broker.list**   --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
+**brokerList**                   --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
                                                       This can be a partial list of brokers, but we recommend at least two for HA.
                                                       The format is comma separated list of hostname:port
 topic                            default-flume-topic  The topic in Kafka to which the messages will be published. If this parameter is configured,
@@ -2160,13 +2164,12 @@ topic                            default-flume-topic  The topic in Kafka to whic
                                                       If the event header contains a "topic" field, the event will be published to that topic
                                                       overriding the topic configured here.
 batchSize                        100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
-kafka.request.required.acks      0                    How many replicas must acknowledge a message before its considered successfully written.
+requiredAcks                     1                    How many replicas must acknowledge a message before its considered successfully written.
                                                       Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
-                                                      The default is the fastest option, but we *highly recommend* setting this to -1 to avoid data loss
-kafka.producer.type              sync                 Whether messages should be sent to broker synchronously or using an asynchronous background thread.
-                                                      Accepted values are sync (safest) and async (faster but potentially unsafe)
+                                                      Set this to -1 to avoid data loss in some cases of leader failure.
 Other Kafka Producer Properties  --                   These properties are used to configure the Kafka Producer. Any producer property supported
                                                       by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
+                                                      For example: kafka.producer.type
 ===============================  ===================  =============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka.
@@ -2186,8 +2189,8 @@ argument.
 
     a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
     a1.sinks.k1.topic = mytopic
-    a1.sinks.k1.kafka.metadata.broker.list = localhost:9092
-    a1.sinks.k1.kafka.request.required.acks = 1
+    a1.sinks.k1.brokerList = localhost:9092
+    a1.sinks.k1.requiredAcks = 1
     a1.sinks.k1.batchSize = 20
     a1.sinks.k1.channel = c1
 

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index a6121ac..a90b950 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -28,10 +28,10 @@ import org.apache.flume.sink.AbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Properties;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.ArrayList;
 
 /**
  * A Flume Sink that can publish messages to Kafka.
@@ -43,11 +43,7 @@ import java.util.Properties;
  * partition key
  * <p/>
  * Mandatory properties are:
- * kafka.metadata.broker.list -- can be a partial list,
- * but at least 2 are recommended for HA
- * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one
- * broker), -1 (accepted by all brokers)
- * kafka.producer.type -- for safety, this should be sync
+ * brokerList -- can be a partial list, but at least 2 are recommended for HA
  * <p/>
  * <p/>
  * however, any property starting with "kafka." will be passed along to the
@@ -60,6 +56,8 @@ import java.util.Properties;
  * different topics
  * batchSize - how many messages to process in one batch. Larger batches
  * improve throughput while adding latency.
+ * requiredAcks -- 0 (unsafe), 1 (accepted by at least one broker, default),
+ * -1 (accepted by all brokers)
  * <p/>
  * header properties (per event):
  * topic
@@ -70,7 +68,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
   private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
   public static final String KEY_HDR = "key";
   public static final String TOPIC_HDR = "topic";
-  private Properties producerProps;
+  private Properties kafkaProps;
   private Producer<String, byte[]> producer;
   private String topic;
   private int batchSize;
@@ -154,7 +152,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
   @Override
   public synchronized void start() {
     // instantiate the producer
-    ProducerConfig config = new ProducerConfig(producerProps);
+    ProducerConfig config = new ProducerConfig(kafkaProps);
     producer = new Producer<String, byte[]>(config);
     super.start();
   }
@@ -166,54 +164,43 @@ public class KafkaSink extends AbstractSink implements Configurable {
   }
 
 
+  /**
+   * We configure the sink and generate properties for the Kafka Producer
+   *
+   * Kafka producer properties is generated as follows:
+   * 1. We generate a properties object with some static defaults that
+   * can be overridden by Sink configuration
+   * 2. We add the configuration users added for Kafka (parameters starting
+   * with .kafka. and must be valid Kafka Producer properties
+   * 3. We add the sink's documented parameters which can override other
+   * properties
+   *
+   * @param context
+   */
   @Override
   public void configure(Context context) {
 
     batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
       KafkaSinkConstants.DEFAULT_BATCH_SIZE);
-    logger.debug("Using batch size: {}", batchSize);
     messageList =
       new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
-    Map<String, String> params = context.getParameters();
-    logger.debug("all params: " + params.entrySet().toString());
-    setProducerProps(params);
-    if (!producerProps.contains("serializer.class")) {
-      producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder");
-    }
-    if (!producerProps.contains("key.serializer.class")) {
-      producerProps.put("key.serializer.class",
-        "kafka.serializer.StringEncoder");
-    }
+    logger.debug("Using batch size: {}", batchSize);
 
     topic = context.getString(KafkaSinkConstants.TOPIC,
       KafkaSinkConstants.DEFAULT_TOPIC);
     if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
-      logger.warn("The Properties 'preprocessor' or 'topic' is not set. " +
-        "Using the default topic name" +
+      logger.warn("The Property 'topic' is not set. " +
+        "Using the default topic name: " +
         KafkaSinkConstants.DEFAULT_TOPIC);
     } else {
       logger.info("Using the static topic: " + topic +
         " this may be over-ridden by event headers");
     }
-  }
 
+    kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
 
-  private void setProducerProps(Map<String, String> params) {
-    producerProps = new Properties();
-    for (String key : params.keySet()) {
-      String value = params.get(key).trim();
-      key = key.trim();
-      if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) {
-        // remove the prefix
-        key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1,
-          key.length());
-        producerProps.put(key.trim(), value);
-        if (logger.isDebugEnabled()) {
-          logger.debug("Reading a Kafka Producer Property: key: " + key +
-            ", value: " + value);
-        }
-      }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Kafka producer properties: " + kafkaProps);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 48d875e..3ee12de 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -18,14 +18,30 @@
 
 package org.apache.flume.sink.kafka;
 
+import kafka.serializer.StringDecoder;
+
 public class KafkaSinkConstants {
 
-  public static final String PROPERTY_PREFIX = "kafka";
+  public static final String PROPERTY_PREFIX = "kafka.";
 
   /* Properties */
-  public static final String DEFAULT_TOPIC = "default-flume-topic";
+
   public static final String TOPIC = "topic";
   public static final String BATCH_SIZE = "batchSize";
+  public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
+  public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
+  public static final String BROKER_LIST_KEY = "metadata.broker.list";
+  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
+  public static final String BROKER_LIST_FLUME_KEY = "brokerList";
+  public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
+
+
 
   public static final int DEFAULT_BATCH_SIZE = 100;
+  public static final String DEFAULT_TOPIC = "default-flume-topic";
+  public static final String DEFAULT_MESSAGE_SERIALIZER =
+          "kafka.serializer.DefaultEncoder";
+  public static final String DEFAULT_KEY_SERIALIZER =
+          "kafka.serializer.StringEncoder";
+  public static final String DEFAULT_REQUIRED_ACKS = "1";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index aed6dac..80f764f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -126,7 +127,7 @@ public class TestKafkaSink {
     kafkaSink.setChannel(memoryChannel);
     kafkaSink.start();
 
-    String msg = "my message";
+    String msg = "test-topic-and-key-from-header";
     Map<String, String> headers = new HashMap<String, String>();
     headers.put("topic", TestConstants.CUSTOM_TOPIC);
     headers.put("key", TestConstants.CUSTOM_KEY);
@@ -156,9 +157,8 @@ public class TestKafkaSink {
   }
 
   @Test
-  public void testEmptyChannel() throws UnsupportedEncodingException {
-
-
+  public void testEmptyChannel() throws UnsupportedEncodingException,
+          EventDeliveryException {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     Configurables.configure(kafkaSink, context);
@@ -167,25 +167,20 @@ public class TestKafkaSink {
     kafkaSink.setChannel(memoryChannel);
     kafkaSink.start();
 
-    try {
-      Sink.Status status = kafkaSink.process();
-      if (status == Sink.Status.BACKOFF) {
-        fail("Error Occurred");
-      }
-    } catch (EventDeliveryException ex) {
-      // ignore
+    Sink.Status status = kafkaSink.process();
+    if (status == Sink.Status.BACKOFF) {
+      fail("Error Occurred");
     }
     assertNull(
       testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
-
   }
 
-
   private Context prepareDefaultContext() {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
-    context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl());
+    context.put("brokerList", testUtil.getKafkaServerUrl());
     context.put("kafka.request.required.acks", "1");
+    context.put("kafka.producer.type","sync");
     context.put("batchSize", "1");
     return context;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
index c07cdea..02a81e2 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -1,4 +1,5 @@
 # Licensed to the Apache Software Foundation (ASF) under one or more
+# Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
index bdcb643..b86600b 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
@@ -15,7 +15,7 @@
 
 kafka.logs.dir=target/logs
 
-log4j.rootLogger=INFO, stdout 
+log4j.rootLogger=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index da78f80..231ae42 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.ConsumerTimeoutException;
@@ -66,6 +67,7 @@ public class KafkaSource extends AbstractSource
   private int consumerTimeout;
   private boolean kafkaAutoCommitEnabled;
   private Context context;
+  private Properties kafkaProps;
   private final List<Event> eventList = new ArrayList<Event>();
 
   public Status process() throws EventDeliveryException {
@@ -122,6 +124,19 @@ public class KafkaSource extends AbstractSource
     }
   }
 
+  /**
+   * We configure the source and generate properties for the Kafka Consumer
+   *
+   * Kafka Consumer properties are generated as follows:
+   * 1. Generate a properties object with some static defaults that
+   * can be overridden by Source configuration
+   * 2. We add the configuration users added for Kafka (parameters starting
+   * with kafka. and must be valid Kafka Consumer properties
+   * 3. We add the source documented parameters which can override other
+   * properties
+   *
+   * @param context
+   */
   public void configure(Context context) {
     this.context = context;
     batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
@@ -130,21 +145,16 @@ public class KafkaSource extends AbstractSource
             KafkaSourceConstants.DEFAULT_BATCH_DURATION);
     topic = context.getString(KafkaSourceConstants.TOPIC);
 
-    //if consumer timeout and autocommit were not set by user,
-    // set them to 10ms and false
-    consumerTimeout = context.getInteger(KafkaSourceConstants.CONSUMER_TIMEOUT,
-            KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
-    context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
-            Integer.toString(consumerTimeout));
-    String autoCommit = context.getString(
-            KafkaSourceConstants.AUTO_COMMIT_ENABLED,
-            String.valueOf(KafkaSourceConstants.DEFAULT_AUTO_COMMIT));
-    kafkaAutoCommitEnabled = Boolean.valueOf(autoCommit);
-    context.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,autoCommit);
-
     if(topic == null) {
       throw new ConfigurationException("Kafka topic must be specified.");
     }
+
+    kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
+    consumerTimeout = Integer.parseInt(kafkaProps.getProperty(
+            KafkaSourceConstants.CONSUMER_TIMEOUT));
+    kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
+            KafkaSourceConstants.AUTO_COMMIT_ENABLED));
+
   }
 
   @Override
@@ -153,7 +163,7 @@ public class KafkaSource extends AbstractSource
 
     try {
       //initialize a consumer. This creates the connection to ZooKeeper
-      consumer = KafkaSourceUtil.getConsumer(context);
+      consumer = KafkaSourceUtil.getConsumer(kafkaProps);
     } catch (Exception e) {
       throw new FlumeException("Unable to create consumer. " +
               "Check whether the ZooKeeper server is up and that the " +
@@ -192,6 +202,10 @@ public class KafkaSource extends AbstractSource
   }
 
 
+
+
+
+
   /**
    * Check if there are messages waiting in Kafka,
    * waiting until timeout (10ms by default) for messages to arrive.

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index ac86f65..169cc10 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -21,16 +21,19 @@ public class KafkaSourceConstants {
   public static final String TIMESTAMP = "timestamp";
   public static final String BATCH_SIZE = "batchSize";
   public static final String BATCH_DURATION_MS = "batchDurationMillis";
-  public static final String CONSUMER_TIMEOUT = "kafka.consumer.timeout.ms";
-  public static final String AUTO_COMMIT_ENABLED = "kafka.auto.commit.enabled";
-  public static final String ZOOKEEPER_CONNECT = "kafka.zookeeper.connect";
-  public static final String GROUP_ID = "kafka.group.id";
-  public static final String PROPERTY_PREFIX = "kafka";
+  public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
+  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled";
+  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+  public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect";
+  public static final String GROUP_ID = "group.id";
+  public static final String GROUP_ID_FLUME = "groupId";
+  public static final String PROPERTY_PREFIX = "kafka.";
 
 
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int DEFAULT_BATCH_DURATION = 1000;
-  public static final int DEFAULT_CONSUMER_TIMEOUT = 10;
+  public static final String DEFAULT_CONSUMER_TIMEOUT = "10";
   public static final boolean DEFAULT_AUTO_COMMIT =  false;
+  public static final String DEFAULT_GROUP_ID = "flume";
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
index 8397272..4a4034b 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
@@ -25,6 +25,7 @@ import kafka.consumer.ConsumerConfig;
 import kafka.javaapi.consumer.ConsumerConnector;
 
 import org.apache.flume.Context;
+import org.apache.flume.conf.ConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,32 +33,80 @@ public class KafkaSourceUtil {
   private static final Logger log =
           LoggerFactory.getLogger(KafkaSourceUtil.class);
 
-  public static Properties getKafkaConfigProperties(Context context) {
+  public static Properties getKafkaProperties(Context context) {
     log.info("context={}",context.toString());
-    Properties props = new Properties();
-    Map<String, String> contextMap = context.getParameters();
-    for(String key : contextMap.keySet()) {
-      String value = contextMap.get(key).trim();
-      key = key.trim();
-      if (key.startsWith(KafkaSourceConstants.PROPERTY_PREFIX)) {
-      // remove the prefix
-      key = key.substring(KafkaSourceConstants.PROPERTY_PREFIX.length() + 1,
-              key.length());
-        props.put(key, value);
-        if (log.isDebugEnabled()) {
-          log.debug("Reading a Kafka Producer Property: key: " + key +
-                  ", value: " + value);
-        }
-      }
-    }
+    Properties props =  generateDefaultKafkaProps();
+    setKafkaProps(context,props);
+    addDocumentedKafkaProps(context,props);
     return props;
   }
 
-  public static ConsumerConnector getConsumer(Context context) {
+  public static ConsumerConnector getConsumer(Properties kafkaProps) {
     ConsumerConfig consumerConfig =
-            new ConsumerConfig(getKafkaConfigProperties(context));
+            new ConsumerConfig(kafkaProps);
     ConsumerConnector consumer =
             Consumer.createJavaConsumerConnector(consumerConfig);
     return consumer;
   }
+
+  /**
+   * Generate consumer properties object with some defaults
+   * @return
+   */
+  private static Properties generateDefaultKafkaProps() {
+    Properties props = new Properties();
+    props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,
+            KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+    props.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
+            KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
+    props.put(KafkaSourceConstants.GROUP_ID,
+            KafkaSourceConstants.DEFAULT_GROUP_ID);
+    return props;
+  }
+
+  /**
+   * Add all configuration parameters starting with "kafka"
+   * to consumer properties
+   */
+  private static void setKafkaProps(Context context,Properties kafkaProps) {
+
+    Map<String,String> kafkaProperties =
+            context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX);
+
+    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
+
+      kafkaProps.put(prop.getKey(), prop.getValue());
+      if (log.isDebugEnabled()) {
+        log.debug("Reading a Kafka Producer Property: key: "
+                + prop.getKey() + ", value: " + prop.getValue());
+      }
+    }
+  }
+
+  /**
+   * Some of the producer properties are especially important
+   * We documented them and gave them a camel-case name to match Flume config
+   * If user set these, we will override any existing parameters with these
+   * settings.
+   * Knowledge of which properties are documented is maintained here for now.
+   * If this will become a maintenance issue we'll set a proper data structure.
+   */
+  private static void addDocumentedKafkaProps(Context context,
+                                              Properties kafkaProps)
+          throws ConfigurationException {
+    String zookeeperConnect = context.getString(
+            KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
+    if (zookeeperConnect == null) {
+      throw new ConfigurationException("ZookeeperConnect must contain " +
+              "at least one ZooKeeper server");
+    }
+    kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect);
+
+    String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
+
+    if (groupID != null ) {
+      kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID);
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
index 1009f1c..d067e24 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.*;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Properties;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import kafka.message.MessageAndMetadata;
 import org.apache.flume.*;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
 import org.junit.After;
 import org.junit.Before;
@@ -74,11 +76,11 @@ public class KafkaSourceTest {
     }
 
     context = new Context();
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
             kafkaServer.getZkConnectString());
-    context.put(KafkaSourceConstants.GROUP_ID,"flume");
+    context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
     context.put(KafkaSourceConstants.TOPIC,topicName);
-    context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,"100");
+    context.put("kafka.consumer.timeout.ms","100");
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
 
@@ -183,7 +185,7 @@ public class KafkaSourceTest {
   public void testNonExistingZk() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,"blabla:666");
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -192,4 +194,6 @@ public class KafkaSourceTest {
     assertEquals(Status.BACKOFF, status);
   }
 
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
index b9a1b25..f87e5ae 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
@@ -39,10 +39,12 @@ public class KafkaSourceUtilTest {
 
   @Before
   public void setUp() throws Exception {
-    context.put("consumer.timeout", "10");
+    context.put("kafka.consumer.timeout", "10");
     context.put("type", "KafkaSource");
     context.put("topic", "test");
-    props = KafkaSourceUtil.getKafkaConfigProperties(context);
+    context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
+    context.put("groupId","test");
+    props = KafkaSourceUtil.getKafkaProperties(context);
     zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
 
 
@@ -53,23 +55,38 @@ public class KafkaSourceUtilTest {
     zookeeper.stopZookeeper();
   }
 
-  @Test
-  public void testGetKafkaConfigParameter() {
-    assertEquals("10",props.getProperty("consumer.timeout"));
-    assertEquals("test",props.getProperty("topic"));
-    assertNull(props.getProperty("type"));
-  }
-
 
   @Test
   public void testGetConsumer() {
-    context.put("zookeeper.connect", "127.0.0.1:"+zkPort);
-    context.put("group.id","test");
-
-    ConsumerConnector cc = KafkaSourceUtil.getConsumer(context);
+    ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
     assertNotNull(cc);
 
   }
 
+  @Test
+  public void testKafkaConsumerProperties() {
+    Context context = new Context();
+    context.put("kafka.auto.commit.enabled", "override.default.autocommit");
+    context.put("kafka.fake.property", "kafka.property.value");
+    context.put("kafka.zookeeper.connect","bad-zookeeper-list");
+    context.put("zookeeperConnect","real-zookeeper-list");
+    Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
+            KafkaSourceConstants.DEFAULT_GROUP_ID);
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
+            "override.default.autocommit");
+    //check that any kafka property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"),
+            "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("zookeeper.connect")
+            ,"real-zookeeper-list");
+  }
+
 
 }