You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/29 19:44:35 UTC

[1/3] flume git commit: FLUME-2821: Flume-Kafka Source with new Consumer

Repository: flume
Updated Branches:
  refs/heads/trunk 5293eba9a -> e8c4a7bff


FLUME-2821: Flume-Kafka Source with new Consumer

(Grigoriy Rozhkov via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f8abaf78
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f8abaf78
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f8abaf78

Branch: refs/heads/trunk
Commit: f8abaf78fb98e91b7a228aaa231f4164d8dcfc97
Parents: 5293eba
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:42:24 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:42:24 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  58 ++-
 flume-ng-sources/flume-kafka-source/pom.xml     |   5 +
 .../apache/flume/source/kafka/KafkaSource.java  | 398 +++++++++++++------
 .../source/kafka/KafkaSourceConstants.java      |  36 +-
 .../flume/source/kafka/KafkaSourceUtil.java     | 112 ------
 .../source/kafka/KafkaSourceEmbeddedKafka.java  |  96 +++--
 .../kafka/KafkaSourceEmbeddedZookeeper.java     |  17 +-
 .../flume/source/kafka/TestKafkaSource.java     | 281 ++++++++++---
 .../flume/source/kafka/TestKafkaSourceUtil.java |  92 -----
 pom.xml                                         |   5 +-
 10 files changed, 649 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 423e0cf..341ae42 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1188,9 +1188,9 @@ Example for agent named a1:
 Kafka Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
+Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the same Consumer Group
-so each will read a unique set of partitions for the topic.
+so each will read a unique set of partitions for the topics.
 
 
 
@@ -1198,11 +1198,13 @@ so each will read a unique set of partitions for the topic.
 Property Name                    Default      Description
 ===============================  ===========  ===================================================
 **channels**                     --
-**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
-**zookeeperConnect**             --           URI of ZooKeeper used by Kafka cluster
-**groupId**                      flume        Unique identified of consumer group. Setting the same id in multiple sources or agents
+**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource``
+**kafka.bootstrap.servers**      --           List of brokers in the Kafka cluster used by the source
+kafka.consumer.group.id          flume        Unique identified of consumer group. Setting the same id in multiple sources or agents
                                               indicates that they are part of the same consumer group
-**topic**                        --           Kafka topic we'll read messages from. At the time, this is a single topic only.
+**kafka.topics**                 --           Comma-separated list of topics the kafka consumer will read messages from.
+**kafka.topics.regex**           --           Regex that defines set of topics the source is subscribed on. This property has higher priority
+                                              than ``kafka.topics`` and overrides ``kafka.topics`` if exists.
 batchSize                        1000         Maximum number of messages written to Channel in one batch
 batchDurationMillis              1000         Maximum time (in ms) before a batch will be written to Channel
                                               The batch will be written whenever the first of size and time will be reached.
@@ -1214,31 +1216,49 @@ maxBackoffSleep                  5000         Maximum wait time that is triggere
                                               ideal for ingestion use cases but a lower value may be required for low latency operations
                                               with interceptors.
 Other Kafka Consumer Properties  --           These properties are used to configure the Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                              For example: kafka.consumer.timeout.ms
-                                              Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
+                                              by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.consumer``.
+                                              For example: kafka.consumer.auto.offset.reset
+                                              Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>` for details
 ===============================  ===========  ===================================================
 
 .. note:: The Kafka Source overrides two Kafka consumer parameters:
-          auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance
-          this can be set to "true", however, this can lead to loss of data
-          consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
-          setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means
-          higher latency in writing batches to channel (since we'll wait longer for data to arrive).
+          auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once
+          strategy of messages retrieval. The duplicates can be present when the source starts.
+          The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+          and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
 
+Deprecated Properties
 
-Example for agent named tier1:
+===============================  ===================  =============================================================================================
+Property Name                    Default              Description
+===============================  ===================  =============================================================================================
+topic                            --                   Use kafka.topics
+groupId                          flume                Use kafka.consumer.group.id
+zookeeperConnect                 --                   Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
+                                                      to establish connection with kafka cluster
+===============================  ===================  =============================================================================================
+
+Example for topic subscription by comma-separated topic list.
 
 .. code-block:: properties
 
     tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
     tier1.sources.source1.channels = channel1
-    tier1.sources.source1.zookeeperConnect = localhost:2181
-    tier1.sources.source1.topic = test1
-    tier1.sources.source1.groupId = flume
-    tier1.sources.source1.kafka.consumer.timeout.ms = 100
+    tier1.sources.source1.batchSize = 5000
+    tier1.sources.source1.batchDurationMillis = 2000
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics = test1, test2
+    tier1.sources.source1.kafka.consumer.group.id = custom.g.id
 
+Example for topic subscription by regex
 
+.. code-block:: properties
+
+    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
+    tier1.sources.source1.channels = channel1
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
+    # the default kafka.consumer.group.id=flume is used
 
 
 NetCat Source

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index 0f93476..5f5c2a8 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -62,6 +62,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <classifier>test</classifier>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index fd1dd3c..db806cc 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -17,40 +17,49 @@
 package org.apache.flume.source.kafka;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
-import org.apache.flume.*;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
-import org.apache.flume.source.AbstractSource;
-import org.apache.flume.source.BasicSourceSemantics;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * A Source for Kafka which reads messages from a kafka topic.
+ * A Source for Kafka which reads messages from kafka topics.
  *
- * <tt>zookeeperConnect: </tt> Kafka's zookeeper connection string.
- * <b>Required</b>
+ * <tt>kafka.bootstrap.servers: </tt> A comma separated list of host:port pairs
+ * to use for establishing the initial connection to the Kafka cluster.
+ * For example host1:port1,host2:port2,...
+ * <b>Required</b> for kafka.
  * <p>
- * <tt>groupId: </tt> the group ID of consumer group. <b>Required</b>
+ * <tt>kafka.consumer.group.id: </tt> the group ID of consumer group. <b>Required</b>
  * <p>
- * <tt>topic: </tt> the topic to consume messages from. <b>Required</b>
+ * <tt>kafka.topics: </tt> the topic list separated by commas to consume messages from.
+ * <b>Required</b>
  * <p>
  * <tt>maxBatchSize: </tt> Maximum number of messages written to Channel in one
  * batch. Default: 1000
@@ -58,99 +67,167 @@ import org.slf4j.LoggerFactory;
  * <tt>maxBatchDurationMillis: </tt> Maximum number of milliseconds before a
  * batch (of any size) will be written to a channel. Default: 1000
  * <p>
- * <tt>kafka.auto.commit.enable: </tt> If true, commit automatically every time
- * period. if false, commit on each batch. Default: false
+ * <tt>kafka.consumer.*: </tt> Any property starting with "kafka.consumer" will be
+ * passed to the kafka consumer So you can use any configuration supported by Kafka 0.9.0.X
  * <p>
- * <tt>kafka.consumer.timeout.ms: </tt> Polling interval for new data for batch.
- * Low value means more CPU usage. High value means the time.upper.limit may be
- * missed. Default: 10
- *
- * Any property starting with "kafka" will be passed to the kafka consumer So
- * you can use any configuration supported by Kafka 0.8.1.1
  */
 public class KafkaSource extends AbstractPollableSource
         implements Configurable {
   private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
-  private ConsumerConnector consumer;
-  private ConsumerIterator<byte[],byte[]> it;
-  private String topic;
-  private int batchUpperLimit;
-  private int timeUpperLimit;
-  private int consumerTimeout;
-  private boolean kafkaAutoCommitEnabled;
+
   private Context context;
   private Properties kafkaProps;
-  private final List<Event> eventList = new ArrayList<Event>();
   private KafkaSourceCounter counter;
+  private KafkaConsumer<String, byte[]> consumer;
+  private Iterator<ConsumerRecord<String, byte[]>> it;
+
+  private final List<Event> eventList = new ArrayList<Event>();
+  private Map<TopicPartition, OffsetAndMetadata> tpAndOffsetMetadata;
+  private AtomicBoolean rebalanceFlag;
+
+  private Map<String, String> headers;
+
+  private int batchUpperLimit;
+  private int maxBatchDurationMillis;
+
+  private Subscriber subscriber;
+
+
+  /**
+   * This class is a helper to subscribe for topics by using
+   * different strategies
+   */
+  public abstract class Subscriber<T> {
+    public abstract void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener);
+    public T get() {return null;}
+  }
+
+  private class TopicListSubscriber extends Subscriber<List<String>> {
+    private List<String> topicList;
+    public TopicListSubscriber(String commaSeparatedTopics) {
+      this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$"));
+    }
+    @Override
+    public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
+      consumer.subscribe(topicList, listener);
+    }
+    @Override
+    public List<String> get() {
+      return topicList;
+    }
+  }
+
+  private class PatternSubscriber extends Subscriber<Pattern> {
+    private Pattern pattern;
+    public PatternSubscriber(String regex) {
+      this.pattern = Pattern.compile(regex);
+    }
+    @Override
+    public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
+      consumer.subscribe(pattern, listener);
+    }
+    @Override
+    public Pattern get() {
+      return pattern;
+    }
+  }
+
 
   @Override
   protected Status doProcess() throws EventDeliveryException {
+    final String batchUUID = UUID.randomUUID().toString();
     byte[] kafkaMessage;
-    byte[] kafkaKey;
+    String kafkaKey;
     Event event;
-    Map<String, String> headers;
-    long batchStartTime = System.currentTimeMillis();
-    long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
+
     try {
-      boolean iterStatus = false;
-      long startTime = System.nanoTime();
+      // prepare time variables for new batch
+      final long nanoBatchStartTime = System.nanoTime();
+      final long batchStartTime = System.currentTimeMillis();
+      final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+
       while (eventList.size() < batchUpperLimit &&
-              System.currentTimeMillis() < batchEndTime) {
-        iterStatus = hasNext();
-        if (iterStatus) {
-          // get next message
-          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
-          kafkaMessage = messageAndMetadata.message();
-          kafkaKey = messageAndMetadata.key();
-
-          // Add headers to event (topic, timestamp, and key)
-          headers = new HashMap<String, String>();
-          headers.put(KafkaSourceConstants.TIMESTAMP,
-                  String.valueOf(System.currentTimeMillis()));
-          headers.put(KafkaSourceConstants.TOPIC, topic);
-          if (kafkaKey != null) {
-            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+              System.currentTimeMillis() < maxBatchEndTime) {
+
+        if (it == null || !it.hasNext()) {
+          // Obtaining new records
+          // Poll time is remainder time for current batch.
+          ConsumerRecords<String, byte[]> records = consumer.poll(
+                  Math.max(0, maxBatchEndTime - System.currentTimeMillis()));
+          it = records.iterator();
+
+          // this flag is set to true in a callback when some partitions are revoked.
+          // If there are any records we commit them.
+          if (rebalanceFlag.get()) {
+            rebalanceFlag.set(false);
+            break;
           }
-          if (log.isDebugEnabled()) {
-            log.debug("Message: {}", new String(kafkaMessage));
+          // check records after poll
+          if (!it.hasNext()) {
+            if (log.isDebugEnabled()) {
+              counter.incrementKafkaEmptyCount();
+              log.debug("Returning with backoff. No more data to read");
+            }
+            // batch time exceeded
+            break;
           }
-          event = EventBuilder.withBody(kafkaMessage, headers);
-          eventList.add(event);
         }
+
+        // get next message
+        ConsumerRecord<String, byte[]> message = it.next();
+        kafkaKey = message.key();
+        kafkaMessage = message.value();
+
+        headers.clear();
+        // Add headers to event (timestamp, topic, partition, key)
+        headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
+        headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+        headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(message.partition()));
+        if (kafkaKey != null) {
+          headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
+        }
+
+        if (log.isDebugEnabled()) {
+          log.debug("Topic: {} Partition: {} Message: {}", new String[]{
+                  message.topic(),
+                  String.valueOf(message.partition()),
+                  new String(kafkaMessage)});
+        }
+
+        event = EventBuilder.withBody(kafkaMessage, headers);
+        eventList.add(event);
+
         if (log.isDebugEnabled()) {
           log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
           log.debug("Event #: {}", eventList.size());
         }
+
+        // For each partition store next offset that is going to be read.
+        tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
+                new OffsetAndMetadata(message.offset() + 1, batchUUID));
       }
-      long endTime = System.nanoTime();
-      counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
-      counter.addToEventReceivedCount(Long.valueOf(eventList.size()));
-      // If we have events, send events to channel
-      // clear the event list
-      // and commit if Kafka doesn't auto-commit
+
       if (eventList.size() > 0) {
+        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
+        counter.addToEventReceivedCount((long) eventList.size());
         getChannelProcessor().processEventBatch(eventList);
         counter.addToEventAcceptedCount(eventList.size());
-        eventList.clear();
         if (log.isDebugEnabled()) {
           log.debug("Wrote {} events to channel", eventList.size());
         }
-        if (!kafkaAutoCommitEnabled) {
-          // commit the read transactions to Kafka to avoid duplicates
+        eventList.clear();
+
+        if (!tpAndOffsetMetadata.isEmpty()) {
           long commitStartTime = System.nanoTime();
-          consumer.commitOffsets();
+          consumer.commitSync(tpAndOffsetMetadata);
           long commitEndTime = System.nanoTime();
-          counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000));
-        }
-      }
-      if (!iterStatus) {
-        if (log.isDebugEnabled()) {
-          counter.incrementKafkaEmptyCount();
-          log.debug("Returning with backoff. No more data to read");
+          counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
+          tpAndOffsetMetadata.clear();
         }
-        return Status.BACKOFF;
+        return Status.READY;
       }
-      return Status.READY;
+
+      return Status.BACKOFF;
     } catch (Exception e) {
       log.error("KafkaSource EXCEPTION, {}", e);
       return Status.BACKOFF;
@@ -161,96 +238,153 @@ public class KafkaSource extends AbstractPollableSource
    * We configure the source and generate properties for the Kafka Consumer
    *
    * Kafka Consumer properties are generated as follows:
-   *
    * 1. Generate a properties object with some static defaults that can be
-   * overridden by Source configuration 2. We add the configuration users added
-   * for Kafka (parameters starting with kafka. and must be valid Kafka Consumer
-   * properties 3. We add the source documented parameters which can override
-   * other properties
-   *
+   * overridden if corresponding properties are specified
+   * 2. We add the configuration users added for Kafka (parameters starting
+   * with kafka.consumer and must be valid Kafka Consumer properties
+   * 3. Add source level properties (with no prefix)
    * @param context
    */
   @Override
   protected void doConfigure(Context context) throws FlumeException {
     this.context = context;
+    headers = new HashMap<String, String>(4);
+    tpAndOffsetMetadata = new HashMap<TopicPartition, OffsetAndMetadata>();
+    rebalanceFlag = new AtomicBoolean(false);
+    kafkaProps = new Properties();
+
+    // can be removed in the next release
+    // See https://issues.apache.org/jira/browse/FLUME-2896
+    translateOldProperties(context);
+
+    String topicProperty = context.getString(KafkaSourceConstants.TOPICS_REGEX);
+    if (topicProperty != null && !topicProperty.isEmpty()) {
+      // create subscriber that uses pattern-based subscription
+      subscriber = new PatternSubscriber(topicProperty);
+    } else
+    if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) {
+      // create subscriber that uses topic list subscription
+      subscriber = new TopicListSubscriber(topicProperty);
+    } else
+    if (subscriber == null) {
+      throw new ConfigurationException("At least one Kafka topic must be specified.");
+    }
+
     batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
             KafkaSourceConstants.DEFAULT_BATCH_SIZE);
-    timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
+    maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
             KafkaSourceConstants.DEFAULT_BATCH_DURATION);
-    topic = context.getString(KafkaSourceConstants.TOPIC);
 
-    if(topic == null) {
-      throw new ConfigurationException("Kafka topic must be specified.");
+    String bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS);
+    if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
-    kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
-    consumerTimeout = Integer.parseInt(kafkaProps.getProperty(
-            KafkaSourceConstants.CONSUMER_TIMEOUT));
-    kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
-            KafkaSourceConstants.AUTO_COMMIT_ENABLED));
+    setConsumerProps(context, bootstrapServers);
 
     if (counter == null) {
       counter = new KafkaSourceCounter(getName());
     }
   }
 
+
+  // We can remove this once the properties are officially deprecated
+  private void translateOldProperties(Context ctx) {
+    // topic
+    String topic = context.getString(KafkaSourceConstants.TOPIC);
+    if (topic != null && !topic.isEmpty()) {
+      subscriber = new TopicListSubscriber(topic);
+      log.warn("{} is deprecated. Please use the parameter {}",
+              KafkaSourceConstants.TOPIC, KafkaSourceConstants.TOPICS);
+    }
+
+    // old groupId
+    String groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID);
+    if (groupId != null && !groupId.isEmpty()) {
+      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+      log.warn("{} is deprecated. Please use the parameter {}",
+              KafkaSourceConstants.OLD_GROUP_ID,
+              KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    }
+  }
+
+
+  private void setConsumerProps(Context ctx, String bootStrapServers) {
+    String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    if ((groupId == null || groupId.isEmpty()) &&
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+        groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
+        log.info("Group ID was not specified. Using " + groupId + " as the group id.");
+    }
+    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
+    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
+    //Defaults overridden based on config
+    kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
+    //These always take precedence over config
+    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    if (groupId != null) {
+      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    }
+    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+
+    log.info(kafkaProps.toString());
+  }
+
+  Properties getConsumerProps() {
+    return kafkaProps;
+  }
+
+  <T> Subscriber<T> getSubscriber() {
+    return subscriber;
+  }
+
   @Override
   protected void doStart() throws FlumeException {
     log.info("Starting {}...", this);
 
-    try {
-      //initialize a consumer. This creates the connection to ZooKeeper
-      consumer = KafkaSourceUtil.getConsumer(kafkaProps);
-    } catch (Exception e) {
-      throw new FlumeException("Unable to create consumer. " +
-              "Check whether the ZooKeeper server is up and that the " +
-              "Flume agent can connect to it.", e);
-    }
+    //initialize a consumer.
+    consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
 
-    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    // We always have just one topic being read by one thread
-    topicCountMap.put(topic, 1);
+    // Subscribe for topics by already specified strategy
+    subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));
 
-    // Get the message iterator for our topic
-    // Note that this succeeds even if the topic doesn't exist
-    // in that case we simply get no messages for the topic
-    // Also note that currently we only support a single topic
-    try {
-      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
-              consumer.createMessageStreams(topicCountMap);
-      List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
-      KafkaStream<byte[], byte[]> stream = topicList.get(0);
-      it = stream.iterator();
-    } catch (Exception e) {
-      throw new FlumeException("Unable to get message iterator from Kafka", e);
-    }
-    log.info("Kafka source {} do started.", getName());
+    // Connect to kafka. 1 second is optimal time.
+    it = consumer.poll(1000).iterator();
+    log.info("Kafka source {} started.", getName());
     counter.start();
   }
 
   @Override
   protected void doStop() throws FlumeException {
     if (consumer != null) {
-      // exit cleanly. This syncs offsets of messages read to ZooKeeper
-      // to avoid reading the same messages again
-      consumer.shutdown();
+      consumer.wakeup();
+      consumer.close();
     }
     counter.stop();
-    log.info("Kafka Source {} do stopped. Metrics: {}", getName(), counter);
+    log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
   }
+}
 
-  /**
-   * Check if there are messages waiting in Kafka,
-   * waiting until timeout (10ms by default) for messages to arrive.
-   * and catching the timeout exception to return a boolean
-   */
-  boolean hasNext() {
-    try {
-      it.hasNext();
-      return true;
-    } catch (ConsumerTimeoutException e) {
-      return false;
+
+class SourceRebalanceListener implements ConsumerRebalanceListener {
+  private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class);
+  private AtomicBoolean rebalanceFlag;
+
+  public SourceRebalanceListener(AtomicBoolean rebalanceFlag) {
+    this.rebalanceFlag = rebalanceFlag;
+  }
+
+  // Set a flag that a rebalance has occurred. Then commit already read events to kafka.
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+      rebalanceFlag.set(true);
     }
   }
 
-}
\ No newline at end of file
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 911012c..2999cf2 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -16,25 +16,33 @@
  */
 package org.apache.flume.source.kafka;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+
 public class KafkaSourceConstants {
-  public static final String TOPIC = "topic";
-  public static final String KEY = "key";
-  public static final String TIMESTAMP = "timestamp";
+
+  public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String TOPICS = KAFKA_PREFIX + "topics";
+  public static final String TOPICS_REGEX = TOPICS + "." + "regex";
+  public static final String DEFAULT_AUTO_COMMIT =  "false";
   public static final String BATCH_SIZE = "batchSize";
   public static final String BATCH_DURATION_MS = "batchDurationMillis";
-  public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
-  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
-  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
-  public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect";
-  public static final String GROUP_ID = "group.id";
-  public static final String GROUP_ID_FLUME = "groupId";
-  public static final String PROPERTY_PREFIX = "kafka.";
-
-
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int DEFAULT_BATCH_DURATION = 1000;
-  public static final String DEFAULT_CONSUMER_TIMEOUT = "10";
-  public static final String DEFAULT_AUTO_COMMIT =  "false";
   public static final String DEFAULT_GROUP_ID = "flume";
 
+  /* Old Properties */
+
+  public static final String TOPIC = "topic";
+  public static final String OLD_GROUP_ID = "groupId";
+
+  // flume event headers
+  public static final String TOPIC_HEADER = "topic";
+  public static final String KEY_HEADER = "key";
+  public static final String TIMESTAMP_HEADER = "timestamp";
+  public static final String PARTITION_HEADER = "partition";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
deleted file mode 100644
index 4a4034b..0000000
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.kafka;
-
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.common.KafkaException;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSourceUtil {
-  private static final Logger log =
-          LoggerFactory.getLogger(KafkaSourceUtil.class);
-
-  public static Properties getKafkaProperties(Context context) {
-    log.info("context={}",context.toString());
-    Properties props =  generateDefaultKafkaProps();
-    setKafkaProps(context,props);
-    addDocumentedKafkaProps(context,props);
-    return props;
-  }
-
-  public static ConsumerConnector getConsumer(Properties kafkaProps) {
-    ConsumerConfig consumerConfig =
-            new ConsumerConfig(kafkaProps);
-    ConsumerConnector consumer =
-            Consumer.createJavaConsumerConnector(consumerConfig);
-    return consumer;
-  }
-
-  /**
-   * Generate consumer properties object with some defaults
-   * @return
-   */
-  private static Properties generateDefaultKafkaProps() {
-    Properties props = new Properties();
-    props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,
-            KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
-    props.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
-            KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
-    props.put(KafkaSourceConstants.GROUP_ID,
-            KafkaSourceConstants.DEFAULT_GROUP_ID);
-    return props;
-  }
-
-  /**
-   * Add all configuration parameters starting with "kafka"
-   * to consumer properties
-   */
-  private static void setKafkaProps(Context context,Properties kafkaProps) {
-
-    Map<String,String> kafkaProperties =
-            context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX);
-
-    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
-
-      kafkaProps.put(prop.getKey(), prop.getValue());
-      if (log.isDebugEnabled()) {
-        log.debug("Reading a Kafka Producer Property: key: "
-                + prop.getKey() + ", value: " + prop.getValue());
-      }
-    }
-  }
-
-  /**
-   * Some of the producer properties are especially important
-   * We documented them and gave them a camel-case name to match Flume config
-   * If user set these, we will override any existing parameters with these
-   * settings.
-   * Knowledge of which properties are documented is maintained here for now.
-   * If this will become a maintenance issue we'll set a proper data structure.
-   */
-  private static void addDocumentedKafkaProps(Context context,
-                                              Properties kafkaProps)
-          throws ConfigurationException {
-    String zookeeperConnect = context.getString(
-            KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
-    if (zookeeperConnect == null) {
-      throw new ConfigurationException("ZookeeperConnect must contain " +
-              "at least one ZooKeeper server");
-    }
-    kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect);
-
-    String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
-
-    if (groupID != null ) {
-      kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 26c5c9d..46d545f 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -18,27 +18,59 @@ package org.apache.flume.source.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import kafka.admin.AdminUtils;
+import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
-import kafka.utils.ZKStringSerializer$;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 public class KafkaSourceEmbeddedKafka {
+
+  public static String HOST;
+
+  static {
+    try {
+      HOST = InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      throw new RuntimeException("Host address can not be obtained", e);
+    }
+  }
+
   KafkaServerStartable kafkaServer;
   KafkaSourceEmbeddedZookeeper zookeeper;
+
   int zkPort = 21818; // none-standard
-  Producer<String,String> producer;
+  int serverPort = 18922;
+
+  KafkaProducer<String, String> producer;
+  File dir;
 
-  public KafkaSourceEmbeddedKafka() {
+  public KafkaSourceEmbeddedKafka(Properties properties) {
     zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
+    dir = new File(System.getProperty("java.io.tmpdir"), "kafka_log-" + UUID.randomUUID());
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     Properties props = new Properties();
     props.put("zookeeper.connect",zookeeper.getConnectString());
     props.put("broker.id","1");
+    props.put("host.name", "localhost");
+    props.put("port", String.valueOf(serverPort));
+    props.put("log.dir", dir.getAbsolutePath());
+    if (properties != null)
+      props.putAll(props);
     KafkaConfig config = new KafkaConfig(props);
     kafkaServer = new KafkaServerStartable(config);
     kafkaServer.startup();
@@ -55,37 +87,49 @@ public class KafkaSourceEmbeddedKafka {
     return zookeeper.getConnectString();
   }
 
-  private void initProducer()
-  {
-    Properties props = new Properties();
-    props.put("metadata.broker.list","127.0.0.1:" +
-            kafkaServer.serverConfig().port());
-    props.put("serializer.class","kafka.serializer.StringEncoder");
-    props.put("request.required.acks", "1");
-
-    ProducerConfig config = new ProducerConfig(props);
-
-    producer = new Producer<String,String>(config);
+  public String getBrockers() {
+    return HOST + ":" + serverPort;
+  }
 
+  private void initProducer() {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", HOST + ":" + serverPort);
+    props.put("acks", "1");
+    producer = new KafkaProducer<String,String>(props,
+            new StringSerializer(), new StringSerializer());
   }
 
   public void produce(String topic, String k, String v) {
-    KeyedMessage<String,String> message = new KeyedMessage<String,String>(topic,k,v);
-    producer.send(message);
+    ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, k, v);
+    try {
+      producer.send(rec).get();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
   }
 
-  public void createTopic(String topicName) {
+  public void produce(String topic, int partition, String k, String v) {
+    ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, partition, k, v);
+    try {
+      producer.send(rec).get();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void createTopic(String topicName, int numPartitions) {
     // Create a ZooKeeper client
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(zookeeper.getConnectString(),
-            sessionTimeoutMs, connectionTimeoutMs,
-            ZKStringSerializer$.MODULE$);
-
-    int numPartitions = 1;
+    ZkClient zkClient = ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs);
+    ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkClient, topicName, numPartitions,
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
             replicationFactor, topicConfig);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
index 1b8a271..db144c2 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 public class KafkaSourceEmbeddedZookeeper {
   private int zkPort;
@@ -31,19 +32,25 @@ public class KafkaSourceEmbeddedZookeeper {
   File dir;
 
 
-  public KafkaSourceEmbeddedZookeeper(int zkPort){
-    int numConnections = 5000;
+  public KafkaSourceEmbeddedZookeeper(int zkPort) {
     int tickTime = 2000;
 
     this.zkPort = zkPort;
 
     String dataDirectory = System.getProperty("java.io.tmpdir");
-    dir = new File(dataDirectory, "zookeeper").getAbsoluteFile();
+    dir = new File(dataDirectory, "zookeeper" + UUID.randomUUID()).getAbsoluteFile();
+
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
 
     try {
       this.zookeeper = new ZooKeeperServer(dir,dir,tickTime);
       this.factory = new NIOServerCnxnFactory();
-      factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0);
+      factory.configure(new InetSocketAddress(KafkaSourceEmbeddedKafka.HOST, zkPort),0);
       factory.startup(zookeeper);
     } catch (IOException e) {
       e.printStackTrace();
@@ -59,6 +66,6 @@ public class KafkaSourceEmbeddedZookeeper {
   }
 
   public String getConnectString() {
-    return "127.0.0.1:"+zkPort;
+    return KafkaSourceEmbeddedKafka.HOST + ":" + zkPort;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 8ec14cc..8e04da8 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -23,17 +23,18 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import kafka.common.TopicExistsException;
-import kafka.consumer.ConsumerIterator;
-import kafka.message.Message;
 
 import org.apache.flume.*;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,42 +43,42 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+
 public class TestKafkaSource {
   private static final Logger log =
           LoggerFactory.getLogger(TestKafkaSource.class);
 
   private KafkaSource kafkaSource;
   private KafkaSourceEmbeddedKafka kafkaServer;
-  private ConsumerIterator<byte[], byte[]> mockIt;
-  private Message message;
   private Context context;
   private List<Event> events;
-  private String topicName = "test1";
-
+  private String topic0 = "test1";
+  private String topic1 = "topic1";
 
   @SuppressWarnings("unchecked")
   @Before
   public void setup() throws Exception {
-
     kafkaSource = new KafkaSource();
-    kafkaServer = new KafkaSourceEmbeddedKafka();
+    kafkaServer = new KafkaSourceEmbeddedKafka(null);
     try {
-      kafkaServer.createTopic(topicName);
+      kafkaServer.createTopic(topic0, 1);
+      kafkaServer.createTopic(topic1, 3);
     } catch (TopicExistsException e) {
       //do nothing
+      e.printStackTrace();
     }
-
-
-    context = new Context();
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
-            kafkaServer.getZkConnectString());
-    context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
-    context.put(KafkaSourceConstants.TOPIC,topicName);
-    context.put("kafka.consumer.timeout.ms","100");
-
+    context = prepareDefaultContext();
     kafkaSource.setChannelProcessor(createGoodChannel());
   }
 
+  private Context prepareDefaultContext() {
+    Context context = new Context();
+    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBrockers());
+    context.put(KAFKA_CONSUMER_PREFIX + "group.id", "flume-group");
+    return context;
+  }
+
   @After
   public void tearDown() throws Exception {
     kafkaSource.stop();
@@ -86,19 +87,89 @@ public class TestKafkaSource {
 
   @SuppressWarnings("unchecked")
   @Test
+  public void testOffsets() throws InterruptedException, EventDeliveryException {
+    long batchDuration = 2000;
+    context.put(TOPICS, topic1);
+    context.put(BATCH_DURATION_MS,
+            String.valueOf(batchDuration));
+    context.put(BATCH_SIZE, "3");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+    assertEquals(0, events.size());
+    kafkaServer.produce(topic1, "", "record1");
+    kafkaServer.produce(topic1, "", "record2");
+    Thread.sleep(500L);
+    status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    assertEquals(2, events.size());
+    events.clear();
+    kafkaServer.produce(topic1, "", "record3");
+    kafkaServer.produce(topic1, "", "record4");
+    kafkaServer.produce(topic1, "", "record5");
+    Thread.sleep(500L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record3", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record4", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record5", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record6");
+    kafkaServer.produce(topic1, "", "record7");
+    kafkaServer.produce(topic1, "", "record8");
+    kafkaServer.produce(topic1, "", "record9");
+    kafkaServer.produce(topic1, "", "record10");
+    Thread.sleep(500L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record6", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record7", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record8", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record11");
+    // status must be READY due to time out exceed.
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record9", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record10", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record11", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record12");
+    kafkaServer.produce(topic1, "", "record13");
+    // stop kafka source
+    kafkaSource.stop();
+    // start again
+    kafkaSource = new KafkaSource();
+    kafkaSource.setChannelProcessor(createGoodChannel());
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    kafkaServer.produce(topic1, "", "record14");
+    Thread.sleep(1000L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record12", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record13", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record14", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
   public void testProcessItNotEmpty() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
 
     Thread.sleep(500L);
-
     Assert.assertEquals(Status.READY, kafkaSource.process());
     Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
     Assert.assertEquals(1, events.size());
@@ -112,14 +183,15 @@ public class TestKafkaSource {
   public void testProcessItNotEmptyBatch() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"2");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"2");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
-    kafkaServer.produce(topicName, "", "foo, bar");
+    kafkaServer.produce(topic0, "", "hello, world");
+    kafkaServer.produce(topic0, "", "foo, bar");
 
     Thread.sleep(500L);
 
@@ -138,6 +210,7 @@ public class TestKafkaSource {
   public void testProcessItEmpty() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
+    context.put(TOPICS, topic0);
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -151,7 +224,7 @@ public class TestKafkaSource {
   public void testNonExistingTopic() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.TOPIC,"faketopic");
+    context.put(TOPICS,"faketopic");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -162,10 +235,11 @@ public class TestKafkaSource {
 
   @SuppressWarnings("unchecked")
   @Test(expected= FlumeException.class)
-  public void testNonExistingZk() throws EventDeliveryException,
+  public void testNonExistingKafkaServer() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
+    context.put(TOPICS, topic0);
+    context.put(BOOTSTRAP_SERVERS,"blabla:666");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -177,37 +251,39 @@ public class TestKafkaSource {
   @Test
   public void testBatchTime() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_DURATION_MS, "250");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
     for (int i=1; i<5000; i++) {
-      kafkaServer.produce(topicName, "", "hello, world " + i);
+      kafkaServer.produce(topic0, "", "hello, world " + i);
     }
     Thread.sleep(500L);
 
+    long error = 50;
     long startTime = System.currentTimeMillis();
     Status status = kafkaSource.process();
     long endTime = System.currentTimeMillis();
     assertEquals(Status.READY, status);
     assertTrue(endTime - startTime <
-            ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
-            context.getLong("kafka.consumer.timeout.ms")) );
+            (context.getLong(BATCH_DURATION_MS) + error));
   }
 
   // Consume event, stop source, start again and make sure we are not
   // consuming same event again
   @Test
   public void testCommit() throws InterruptedException, EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
 
     Thread.sleep(500L);
 
@@ -224,14 +300,14 @@ public class TestKafkaSource {
   @Test
   public void testNonCommit() throws EventDeliveryException,
           InterruptedException {
-
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS,"30000");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
     Thread.sleep(500L);
 
     kafkaSource.setChannelProcessor(createBadChannel());
@@ -252,13 +328,14 @@ public class TestKafkaSource {
   @Test
   public void testTwoBatches() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS, "30000");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "event 1");
+    kafkaServer.produce(topic0, "", "event 1");
     Thread.sleep(500L);
 
     kafkaSource.process();
@@ -266,7 +343,7 @@ public class TestKafkaSource {
             Charsets.UTF_8));
     events.clear();
 
-    kafkaServer.produce(topicName, "", "event 2");
+    kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
     Assert.assertEquals("event 2", new String(events.get(0).getBody(),
@@ -276,14 +353,15 @@ public class TestKafkaSource {
   @Test
   public void testTwoBatchesWithAutocommit() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
-    context.put("kafka.auto.commit.enable","true");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS,"30000");
+    context.put(KAFKA_CONSUMER_PREFIX + "enable.auto.commit", "true");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "event 1");
+    kafkaServer.produce(topic0, "", "event 1");
     Thread.sleep(500L);
 
     kafkaSource.process();
@@ -291,7 +369,7 @@ public class TestKafkaSource {
             Charsets.UTF_8));
     events.clear();
 
-    kafkaServer.produce(topicName, "", "event 2");
+    kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
     Assert.assertEquals("event 2", new String(events.get(0).getBody(),
@@ -304,13 +382,14 @@ public class TestKafkaSource {
   public void testNullKey() throws EventDeliveryException,
       SecurityException, NoSuchFieldException, IllegalArgumentException,
       IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, null , "hello, world");
+    kafkaServer.produce(topic0, null, "hello, world");
 
     Thread.sleep(500L);
 
@@ -322,6 +401,110 @@ public class TestKafkaSource {
         Charsets.UTF_8));
   }
 
+  @Test
+  public void testSourceProperties() {
+    Context context = new Context();
+    context.put(TOPICS, "test1, test2");
+    context.put(TOPICS_REGEX, "^stream[0-9]$");
+    context.put(BOOTSTRAP_SERVERS, "bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+
+    //check that kafka.topics.regex has higher priority than topics
+    //type of subscriber should be PatternSubscriber
+    KafkaSource.Subscriber<Pattern> subscriber = source.getSubscriber();
+    Pattern pattern = subscriber.get();
+    Assert.assertTrue(pattern.matcher("stream1").find());
+  }
+
+  @Test
+  public void testKafkaProperties() {
+    Context context = new Context();
+    context.put(TOPICS, "test1, test2");
+    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.default.group.id");
+    context.put(KAFKA_CONSUMER_PREFIX + "fake.property", "kafka.property.value");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    context.put(KAFKA_CONSUMER_PREFIX + "bootstrap.servers", "bad-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    Properties kafkaProps = source.getConsumerProps();
+
+    //check that we have defaults set
+    assertEquals(
+            String.valueOf(DEFAULT_AUTO_COMMIT),
+            kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            "override.default.group.id",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    //check that any kafka property gets in
+    assertEquals(
+            "kafka.property.value",
+            kafkaProps.getProperty("fake.property"));
+    //check that documented property overrides defaults
+    assertEquals(
+            "real-bootstrap-servers-list",
+            kafkaProps.getProperty("bootstrap.servers"));
+  }
+
+  @Test
+  public void testOldProperties() {
+    Context context = new Context();
+
+    context.put(TOPIC, "old.topic");
+    context.put(OLD_GROUP_ID, "old.groupId");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    Properties kafkaProps = source.getConsumerProps();
+
+    KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber();
+    //check topic was set
+    assertEquals(
+            "old.topic",
+            subscriber.get().get(0));
+    //check that kafka old properties override the default and get correct name
+    assertEquals(
+            "old.groupId",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+
+    source = new KafkaSource();
+    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id");
+    source.doConfigure(context);
+    kafkaProps = source.getConsumerProps();
+    //check that kafka new properties override old
+    assertEquals(
+            "override.old.group.id",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+
+    context.clear();
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    context.put(TOPIC, "old.topic");
+    source = new KafkaSource();
+    source.doConfigure(context);
+    kafkaProps = source.getConsumerProps();
+    //check defaults set
+    assertEquals(
+            KafkaSourceConstants.DEFAULT_GROUP_ID,
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+  }
+
+  @Test
+  public void testPatternBasedSubscription() {
+    Context context = new Context();
+
+    context.put(TOPICS_REGEX, "^topic[0-9]$");
+    context.put(OLD_GROUP_ID, "old.groupId");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    KafkaSource.Subscriber<Pattern> subscriber = source.getSubscriber();
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(subscriber.get().matcher("topic" + i).find());
+    }
+    Assert.assertFalse(subscriber.get().matcher("topic").find());
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
@@ -352,4 +535,4 @@ public class TestKafkaSource {
 
     return channelProcessor;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
deleted file mode 100644
index 0cbb4b6..0000000
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.source.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.flume.Context;
-import org.apache.zookeeper.server.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestKafkaSourceUtil {
-  private Properties props = new Properties();
-  private Context context = new Context();
-  private int zkPort = 21818; // none-standard
-  private KafkaSourceEmbeddedZookeeper zookeeper;
-
-  @Before
-  public void setUp() throws Exception {
-    context.put("kafka.consumer.timeout", "10");
-    context.put("type", "KafkaSource");
-    context.put("topic", "test");
-    context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
-    context.put("groupId","test");
-    props = KafkaSourceUtil.getKafkaProperties(context);
-    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
-
-
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    zookeeper.stopZookeeper();
-  }
-
-
-  @Test
-  public void testGetConsumer() {
-    ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
-    assertNotNull(cc);
-
-  }
-
-  @Test
-  public void testKafkaConsumerProperties() {
-    Context context = new Context();
-    context.put("kafka.auto.commit.enable", "override.default.autocommit");
-    context.put("kafka.fake.property", "kafka.property.value");
-    context.put("kafka.zookeeper.connect","bad-zookeeper-list");
-    context.put("zookeeperConnect","real-zookeeper-list");
-    Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
-
-    //check that we have defaults set
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
-            KafkaSourceConstants.DEFAULT_GROUP_ID);
-    //check that kafka properties override the default and get correct name
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
-            "override.default.autocommit");
-    //check that any kafka property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"),
-            "kafka.property.value");
-    //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("zookeeper.connect")
-            ,"real-zookeeper-list");
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 15c086b..3b2c97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@ limitations under the License.
     <elasticsearch.version>0.90.1</elasticsearch.version>
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
+    <kafka.version>0.9.0.1</kafka.version>
     <kite.version>1.0.0</kite.version>
     <hive.version>1.0.0</hive.version>
     <xalan.verion>2.7.1</xalan.verion>
@@ -1337,12 +1338,12 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
-        <version>0.8.1.1</version>
+        <version>${kafka.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
-        <version>0.8.1.1</version>
+        <version>${kafka.version}</version>
         <classifier>test</classifier>
         <scope>test</scope>
       </dependency>


[3/3] flume git commit: FLUME-2823: Flume-Kafka-Channel with new APIs

Posted by ja...@apache.org.
FLUME-2823: Flume-Kafka-Channel with new APIs

(Jeff Holoman via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e8c4a7bf
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e8c4a7bf
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e8c4a7bf

Branch: refs/heads/trunk
Commit: e8c4a7bffc74f6ea10ae6cc45adbaf4919f45186
Parents: 7f588e6
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:45:40 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:45:40 2016 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |   6 +
 .../flume/channel/kafka/KafkaChannel.java       | 641 ++++++++++++-------
 .../kafka/KafkaChannelConfiguration.java        |  32 +-
 .../flume/channel/kafka/TestKafkaChannel.java   | 219 ++++---
 .../src/test/resources/kafka-server.properties  |   2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  88 ++-
 6 files changed, 634 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index fa1bd42..587b4b4 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -40,6 +40,12 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index c0c1c66..2d9b0c6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,108 +20,120 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import kafka.consumer.*;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.avro.io.*;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.conf.ConfigurationException;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
 public class KafkaChannel extends BasicChannelSemantics {
 
-  private final static Logger LOGGER =
-    LoggerFactory.getLogger(KafkaChannel.class);
+  private final static Logger logger =
+          LoggerFactory.getLogger(KafkaChannel.class);
 
+  private final Properties consumerProps = new Properties();
+  private final Properties producerProps = new Properties();
 
-  private final Properties kafkaConf = new Properties();
-  private Producer<String, byte[]> producer;
+  private KafkaProducer<String, byte[]> producer;
   private final String channelUUID = UUID.randomUUID().toString();
 
   private AtomicReference<String> topic = new AtomicReference<String>();
   private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
-  private final Map<String, Integer> topicCountMap =
-    Collections.synchronizedMap(new HashMap<String, Integer>());
+
+  //used to indicate if a rebalance has occurred during the current transaction
+  AtomicBoolean rebalanceFlag = new AtomicBoolean();
+  //This isn't a Kafka property per se, but we allow it to be configurable
+  private long pollTimeout = DEFAULT_POLL_TIMEOUT;
+
 
   // Track all consumers to close them eventually.
-  private final List<ConsumerAndIterator> consumers =
-    Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
+  private final List<ConsumerAndRecords> consumers =
+          Collections.synchronizedList(new LinkedList<ConsumerAndRecords>());
 
   private KafkaChannelCounter counter;
 
-  /* Each ConsumerConnector commit will commit all partitions owned by it. To
+   /* Each Consumer commit will commit all partitions owned by it. To
    * ensure that each partition is only committed when all events are
-   * actually done, we will need to keep a ConsumerConnector per thread.
-   * See Neha's answer here:
-   * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic
-   * Since only one consumer connector will a partition at any point in time,
-   * when we commit the partition we would have committed all events to the
-   * final destination from that partition.
-   *
-   * If a new partition gets assigned to this connector,
-   * my understanding is that all message from the last partition commit will
-   * get replayed which may cause duplicates -- which is fine as this
-   * happens only on partition rebalancing which is on failure or new nodes
-   * coming up, which is rare.
+   * actually done, we will need to keep a Consumer per thread.
    */
-  private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new
-    ThreadLocal<ConsumerAndIterator>() {
-      @Override
-      public ConsumerAndIterator initialValue() {
-        return createConsumerAndIter();
-      }
-    };
+
+  private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new
+          ThreadLocal<ConsumerAndRecords>() {
+            @Override
+            public ConsumerAndRecords initialValue() {
+              return createConsumerAndRecords();
+            }
+          };
 
   @Override
   public void start() {
-    try {
-      LOGGER.info("Starting Kafka Channel: " + getName());
-      producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf));
+      logger.info("Starting Kafka Channel: {}", getName());
+      producer = new KafkaProducer<String, byte[]>(producerProps);
       // We always have just one topic being read by one thread
-      LOGGER.info("Topic = " + topic.get());
-      topicCountMap.put(topic.get(), 1);
+      logger.info("Topic = {}", topic.get());
       counter.start();
       super.start();
-    } catch (Exception e) {
-      LOGGER.error("Could not start producer");
-      throw new FlumeException("Unable to create Kafka Connections. " +
-        "Check whether Kafka Brokers are up and that the " +
-        "Flume agent can connect to it.", e);
-    }
   }
 
   @Override
   public void stop() {
-    for (ConsumerAndIterator c : consumers) {
+    for (ConsumerAndRecords c : consumers) {
       try {
-        decommissionConsumerAndIterator(c);
+        decommissionConsumerAndRecords(c);
       } catch (Exception ex) {
-        LOGGER.warn("Error while shutting down consumer.", ex);
+        logger.warn("Error while shutting down consumer.", ex);
       }
     }
     producer.close();
     counter.stop();
     super.stop();
-    LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(),
-        counter);
+    logger.info("Kafka channel {} stopped. Metrics: {}", getName(),
+            counter);
   }
 
   @Override
@@ -129,98 +141,147 @@ public class KafkaChannel extends BasicChannelSemantics {
     return new KafkaTransaction();
   }
 
-  private synchronized ConsumerAndIterator createConsumerAndIter() {
-    try {
-      ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf);
-      ConsumerConnector consumer =
-        Consumer.createJavaConsumerConnector(consumerConfig);
-      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
-        consumer.createMessageStreams(topicCountMap);
-      final List<KafkaStream<byte[], byte[]>> streamList = consumerMap
-        .get(topic.get());
-      KafkaStream<byte[], byte[]> stream = streamList.remove(0);
-      ConsumerAndIterator ret =
-        new ConsumerAndIterator(consumer, stream.iterator(), channelUUID);
-      consumers.add(ret);
-      LOGGER.info("Created new consumer to connect to Kafka");
-      return ret;
-    } catch (Exception e) {
-      throw new FlumeException("Unable to connect to Kafka", e);
-    }
-  }
-
-  Properties getKafkaConf() {
-    return kafkaConf;
-  }
-
   @Override
   public void configure(Context ctx) {
-    String topicStr = ctx.getString(TOPIC);
+
+    //Can remove in the next release
+    translateOldProps(ctx);
+
+    String topicStr = ctx.getString(TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
-      LOGGER
-        .info("Topic was not specified. Using " + topicStr + " as the topic.");
+      logger.info("Topic was not specified. Using {} as the topic.", topicStr);
     }
     topic.set(topicStr);
-    String groupId = ctx.getString(GROUP_ID_FLUME);
-    if (groupId == null || groupId.isEmpty()) {
-      groupId = DEFAULT_GROUP_ID;
-      LOGGER.info(
-        "Group ID was not specified. Using " + groupId + " as the group id.");
+    String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
+    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
-    String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
-    if (brokerList == null || brokerList.isEmpty()) {
-      throw new ConfigurationException("Broker List must be specified");
+
+    setProducerProps(ctx, bootStrapServers);
+    setConsumerProps(ctx, bootStrapServers);
+
+    parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
+    pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
+
+    if (counter == null) {
+      counter = new KafkaChannelCounter(getName());
     }
-    String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
-    if (zkConnect == null || zkConnect.isEmpty()) {
-      throw new ConfigurationException(
-        "Zookeeper Connection must be specified");
+  }
+
+  // We can remove this once the properties are officially deprecated
+  private void translateOldProps(Context ctx) {
+
+    if (!(ctx.containsKey(TOPIC_CONFIG))) {
+      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
+      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
     }
-    kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
-    kafkaConf.put(GROUP_ID, groupId);
-    kafkaConf.put(BROKER_LIST_KEY, brokerList);
-    kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
-    kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
-    if(kafkaConf.get(CONSUMER_TIMEOUT) == null) {
-      kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT);
+
+    //Broker List
+    // If there is no value we need to check and set the old param and log a warning message
+    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
+      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+      if (brokerList == null || brokerList.isEmpty()) {
+        throw new ConfigurationException("Bootstrap Servers must be specified");
+      } else {
+        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+      }
     }
-    kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
-    LOGGER.info(kafkaConf.toString());
-    parseAsFlumeEvent =
-      ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
-
-    boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET,
-      DEFAULT_READ_SMALLEST_OFFSET);
-    // If the data is to be parsed as Flume events, we always read the smallest.
-    // Else, we read the configuration, which by default reads the largest.
-    if (parseAsFlumeEvent || readSmallest) {
-      // readSmallest is eval-ed only if parseAsFlumeEvent is false.
-      // The default is largest, so we don't need to set it explicitly.
-      kafkaConf.put("auto.offset.reset", "smallest");
+
+    //GroupId
+    // If there is an old Group Id set, then use that if no groupId is set.
+    if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
+    String oldGroupId = ctx.getString(GROUP_ID_FLUME);
+      if ( oldGroupId != null  && !oldGroupId.isEmpty()) {
+        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
+        logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+      }
     }
 
-    if (counter == null) {
-      counter = new KafkaChannelCounter(getName());
+    if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) {
+      Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET);
+      String auto;
+      if (oldReadSmallest != null) {
+        if (oldReadSmallest) {
+          auto = "earliest";
+        } else {
+          auto = "latest";
+        }
+        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
+        logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+      }
+
+    }
+  }
+
+
+  private void setProducerProps(Context ctx, String bootStrapServers) {
+    producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
+    //Defaults overridden based on config
+    producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    logger.info("Producer properties: " + producerProps.toString());
+  }
+
+  protected Properties getProducerProps() {
+    return producerProps;
+  }
+
+  private void setConsumerProps(Context ctx, String bootStrapServers) {
+    String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    if (groupId == null || groupId.isEmpty()) {
+      groupId = DEFAULT_GROUP_ID;
+      logger.info("Group ID was not specified. Using {} as the group id.", groupId);
     }
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
+    //Defaults overridden based on config
+    consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
+    //These always take precedence over config
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    logger.info(consumerProps.toString());
+  }
 
+  protected Properties getConsumerProps() { return consumerProps; }
+
+
+  private synchronized ConsumerAndRecords createConsumerAndRecords() {
+    try {
+      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
+      ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
+      logger.info("Created new consumer to connect to Kafka");
+      car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag));
+      car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+      consumers.add(car);
+      return car;
+    } catch (Exception e) {
+      throw new FlumeException("Unable to connect to Kafka", e);
+    }
   }
 
-  private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
+  private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
     if (c.failedEvents.isEmpty()) {
-      c.consumer.commitOffsets();
+      c.commitOffsets();
     }
     c.failedEvents.clear();
-    c.consumer.shutdown();
+    c.consumer.close();
   }
 
-  // Force a consumer to be initialized. There are  many duplicates in
-  // tests due to rebalancing - making testing tricky. In production,
-  // this is less of an issue as
-  // rebalancing would happen only on startup.
   @VisibleForTesting
   void registerThread() {
-    consumerAndIter.get();
+    try {
+      consumerAndRecords.get();
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+      e.printStackTrace();
+    }
   }
 
   private enum TransactionType {
@@ -233,54 +294,41 @@ public class KafkaChannel extends BasicChannelSemantics {
   private class KafkaTransaction extends BasicTransactionSemantics {
 
     private TransactionType type = TransactionType.NONE;
-    // For Puts
     private Optional<ByteArrayOutputStream> tempOutStream = Optional
-      .absent();
-
-    // For put transactions, serialize the events and batch them and send it.
-    private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent();
+            .absent();
+    // For put transactions, serialize the events and hold them until the commit goes is requested.
+    private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent();
     // For take transactions, deserialize and hold them till commit goes through
     private Optional<LinkedList<Event>> events = Optional.absent();
     private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
-      Optional.absent();
+            Optional.absent();
     private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
-      Optional.absent();
+            Optional.absent();
+    private Optional<LinkedList<Future<RecordMetadata>>> kafkaFutures =
+            Optional.absent();
+    private final String batchUUID = UUID.randomUUID().toString();
 
     // Fine to use null for initial value, Avro will create new ones if this
     // is null
     private BinaryEncoder encoder = null;
     private BinaryDecoder decoder = null;
-    private final String batchUUID = UUID.randomUUID().toString();
     private boolean eventTaken = false;
 
     @Override
+    protected void doBegin() throws InterruptedException {
+      rebalanceFlag.set(false);
+    }
+
+    @Override
     protected void doPut(Event event) throws InterruptedException {
       type = TransactionType.PUT;
-      if (!serializedEvents.isPresent()) {
-        serializedEvents = Optional.of(new LinkedList<byte[]>());
+      if (!producerRecords.isPresent()) {
+        producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
       }
-
+      String key = event.getHeaders().get(KEY_HEADER);
       try {
-        if (parseAsFlumeEvent) {
-          if (!tempOutStream.isPresent()) {
-            tempOutStream = Optional.of(new ByteArrayOutputStream());
-          }
-          if (!writer.isPresent()) {
-            writer = Optional.of(new
-              SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
-          }
-          tempOutStream.get().reset();
-          AvroFlumeEvent e = new AvroFlumeEvent(
-            toCharSeqMap(event.getHeaders()),
-            ByteBuffer.wrap(event.getBody()));
-          encoder = EncoderFactory.get()
-            .directBinaryEncoder(tempOutStream.get(), encoder);
-          writer.get().write(e, encoder);
-          // Not really possible to avoid this copy :(
-          serializedEvents.get().add(tempOutStream.get().toByteArray());
-        } else {
-          serializedEvents.get().add(event.getBody());
-        }
+        producerRecords.get().add(new ProducerRecord<String, byte[]>
+                (topic.get(), key, serializeValue(event, parseAsFlumeEvent)));
       } catch (Exception e) {
         throw new ChannelException("Error while serializing event", e);
       }
@@ -291,53 +339,64 @@ public class KafkaChannel extends BasicChannelSemantics {
     protected Event doTake() throws InterruptedException {
       type = TransactionType.TAKE;
       try {
-        if (!(consumerAndIter.get().uuid.equals(channelUUID))) {
-          LOGGER.info("UUID mismatch, creating new consumer");
-          decommissionConsumerAndIterator(consumerAndIter.get());
-          consumerAndIter.remove();
+        if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
+          logger.info("UUID mismatch, creating new consumer");
+          decommissionConsumerAndRecords(consumerAndRecords.get());
+          consumerAndRecords.remove();
         }
       } catch (Exception ex) {
-        LOGGER.warn("Error while shutting down consumer", ex);
+        logger.warn("Error while shutting down consumer", ex);
       }
       if (!events.isPresent()) {
         events = Optional.of(new LinkedList<Event>());
       }
       Event e;
-      if (!consumerAndIter.get().failedEvents.isEmpty()) {
-        e = consumerAndIter.get().failedEvents.removeFirst();
+      // Give the channel a chance to commit if there has been a rebalance
+      if (rebalanceFlag.get()) {
+        logger.debug("Returning null event after Consumer rebalance.");
+        return null;
+      }
+      if (!consumerAndRecords.get().failedEvents.isEmpty()) {
+        e = consumerAndRecords.get().failedEvents.removeFirst();
       } else {
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString());
+        }
+
         try {
-          ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
           long startTime = System.nanoTime();
-          it.hasNext();
-          long endTime = System.nanoTime();
-          counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
-          if (parseAsFlumeEvent) {
-            ByteArrayInputStream in =
-              new ByteArrayInputStream(it.next().message());
-            decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
-            if (!reader.isPresent()) {
-              reader = Optional.of(
-                new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
-            }
-            AvroFlumeEvent event = reader.get().read(null, decoder);
-            e = EventBuilder.withBody(event.getBody().array(),
-              toStringMap(event.getHeaders()));
-          } else {
-            e = EventBuilder.withBody(it.next().message(),
-              Collections.EMPTY_MAP);
+          if (!consumerAndRecords.get().recordIterator.hasNext()) {
+            consumerAndRecords.get().poll();
           }
+          if (consumerAndRecords.get().recordIterator.hasNext()) {
+            ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
+            e = deserializeValue(record.value(), parseAsFlumeEvent);
+            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+            OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
+            consumerAndRecords.get().offsets.put(tp, oam);
+
+            if (logger.isTraceEnabled()) {
+              logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString());
+            }
 
-        } catch (ConsumerTimeoutException ex) {
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Timed out while waiting for data to come from Kafka",
-              ex);
+            //Add the key to the header
+            e.getHeaders().put(KEY_HEADER, record.key());
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
+            }
+
+            long endTime = System.nanoTime();
+            counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
+          } else {
+            return null;
           }
-          return null;
         } catch (Exception ex) {
-          LOGGER.warn("Error while getting events from Kafka", ex);
+          logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " +
+                  "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex);
           throw new ChannelException("Error while getting events from Kafka",
-            ex);
+                  ex);
         }
       }
       eventTaken = true;
@@ -351,32 +410,41 @@ public class KafkaChannel extends BasicChannelSemantics {
         return;
       }
       if (type.equals(TransactionType.PUT)) {
+        if (!kafkaFutures.isPresent()) {
+          kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
+        }
         try {
-          List<KeyedMessage<String, byte[]>> messages = new
-            ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get()
-            .size());
-          for (byte[] event : serializedEvents.get()) {
-            messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
-              batchUUID, event));
-          }
+          long batchSize = producerRecords.get().size();
           long startTime = System.nanoTime();
-          producer.send(messages);
+          int index = 0;
+          for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
+            index++;
+            kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
+          }
+          //prevents linger.ms from being a problem
+          producer.flush();
+
+          for (Future<RecordMetadata> future : kafkaFutures.get()) {
+            future.get();
+          }
           long endTime = System.nanoTime();
-          counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
-          counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
-          serializedEvents.get().clear();
+          counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
+          counter.addToEventPutSuccessCount(batchSize);
+          producerRecords.get().clear();
+          kafkaFutures.get().clear();
         } catch (Exception ex) {
-          LOGGER.warn("Sending events to Kafka failed", ex);
+          logger.warn("Sending events to Kafka failed", ex);
           throw new ChannelException("Commit failed as send to Kafka failed",
-            ex);
+                  ex);
         }
       } else {
-        if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+        if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
           long startTime = System.nanoTime();
-          consumerAndIter.get().consumer.commitOffsets();
+          consumerAndRecords.get().commitOffsets();
           long endTime = System.nanoTime();
-          counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000));
-         }
+          counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
+          consumerAndRecords.get().printCurrentAssignment();
+        }
         counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
         events.get().clear();
       }
@@ -388,37 +456,66 @@ public class KafkaChannel extends BasicChannelSemantics {
         return;
       }
       if (type.equals(TransactionType.PUT)) {
-        serializedEvents.get().clear();
+        producerRecords.get().clear();
+        kafkaFutures.get().clear();
       } else {
         counter.addToRollbackCounter(Long.valueOf(events.get().size()));
-        consumerAndIter.get().failedEvents.addAll(events.get());
+        consumerAndRecords.get().failedEvents.addAll(events.get());
         events.get().clear();
       }
     }
-  }
-
 
-  private class ConsumerAndIterator {
-    final ConsumerConnector consumer;
-    final ConsumerIterator<byte[], byte[]> iterator;
-    final String uuid;
-    final LinkedList<Event> failedEvents = new LinkedList<Event>();
+    private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOException {
+      byte[] bytes;
+      if (parseAsFlumeEvent) {
+        if (!tempOutStream.isPresent()) {
+          tempOutStream = Optional.of(new ByteArrayOutputStream());
+        }
+        if (!writer.isPresent()) {
+          writer = Optional.of(new
+                  SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+        }
+        tempOutStream.get().reset();
+        AvroFlumeEvent e = new AvroFlumeEvent(
+                toCharSeqMap(event.getHeaders()),
+                ByteBuffer.wrap(event.getBody()));
+        encoder = EncoderFactory.get()
+                .directBinaryEncoder(tempOutStream.get(), encoder);
+        writer.get().write(e, encoder);
+        encoder.flush();
+        bytes = tempOutStream.get().toByteArray();
+      } else {
+        bytes = event.getBody();
+      }
+      return bytes;
+    }
 
-    ConsumerAndIterator(ConsumerConnector consumerConnector,
-      ConsumerIterator<byte[], byte[]> iterator, String uuid) {
-      this.consumer = consumerConnector;
-      this.iterator = iterator;
-      this.uuid = uuid;
+    private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException {
+      Event e;
+      if (parseAsFlumeEvent) {
+        ByteArrayInputStream in =
+                new ByteArrayInputStream(value);
+        decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+        if (!reader.isPresent()) {
+          reader = Optional.of(
+                  new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
+        }
+        AvroFlumeEvent event = reader.get().read(null, decoder);
+        e = EventBuilder.withBody(event.getBody().array(),
+                toStringMap(event.getHeaders()));
+      } else {
+        e = EventBuilder.withBody(value, Collections.EMPTY_MAP);
+      }
+      return e;
     }
   }
 
   /**
    * Helper function to convert a map of String to a map of CharSequence.
    */
-  private static Map<CharSequence, CharSequence> toCharSeqMap(
-    Map<String, String> stringMap) {
+  private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
     Map<CharSequence, CharSequence> charSeqMap =
-      new HashMap<CharSequence, CharSequence>();
+            new HashMap<CharSequence, CharSequence>();
     for (Map.Entry<String, String> entry : stringMap.entrySet()) {
       charSeqMap.put(entry.getKey(), entry.getValue());
     }
@@ -428,13 +525,105 @@ public class KafkaChannel extends BasicChannelSemantics {
   /**
    * Helper function to convert a map of CharSequence to a map of String.
    */
-  private static Map<String, String> toStringMap(
-    Map<CharSequence, CharSequence> charSeqMap) {
-    Map<String, String> stringMap =
-      new HashMap<String, String>();
+  private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
+    Map<String, String> stringMap = new HashMap<String, String>();
     for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
       stringMap.put(entry.getKey().toString(), entry.getValue().toString());
     }
     return stringMap;
   }
+
+  /* Object to store our consumer */
+  private class ConsumerAndRecords {
+    final KafkaConsumer<String, byte[]> consumer;
+    final String uuid;
+    final LinkedList<Event> failedEvents = new LinkedList<Event>();
+
+    ConsumerRecords<String, byte[]> records;
+    Iterator<ConsumerRecord<String, byte[]>> recordIterator;
+    Map<TopicPartition, OffsetAndMetadata> offsets;
+
+    ConsumerAndRecords(KafkaConsumer<String, byte[]> consumer, String uuid) {
+      this.consumer = consumer;
+      this.uuid = uuid;
+      this.records = ConsumerRecords.empty();
+      this.recordIterator = records.iterator();
+    }
+
+    void poll() {
+      this.records = consumer.poll(pollTimeout);
+      this.recordIterator = records.iterator();
+      logger.trace("polling");
+    }
+
+    void commitOffsets() {
+      this.consumer.commitSync(offsets);
+    }
+
+    // This will reset the latest assigned partitions to the last committed offsets;
+
+    public void printCurrentAssignment() {
+      StringBuilder sb = new StringBuilder();
+      for (TopicPartition tp : this.consumer.assignment()) {
+        try {
+          sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset())
+                  .append(",").append(this.consumer.committed(tp).metadata()).append("]");
+          if (logger.isDebugEnabled()) {
+            logger.debug(sb.toString());
+          }
+        } catch (NullPointerException npe) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Committed {}", tp);
+          }
+        }
+      }
+    }
+  }
+}
+
+// Throw exception if there is an error
+class ChannelCallback implements Callback {
+  private static final Logger log = LoggerFactory.getLogger(ChannelCallback.class);
+  private int index;
+  private long startTime;
+
+  public ChannelCallback(int index, long startTime) {
+    this.index = index;
+    this.startTime = startTime;
+  }
+
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    if (exception != null) {
+      log.trace("Error sending message to Kafka due to " + exception.getMessage());
+    }
+    if (log.isDebugEnabled()) {
+      long batchElapsedTime = System.currentTimeMillis() - startTime;
+      log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" +
+              metadata.offset() + "-" + batchElapsedTime);
+    }
+  }
+}
+
+class ChannelRebalanceListener implements ConsumerRebalanceListener {
+  private static final Logger log = LoggerFactory.getLogger(ChannelRebalanceListener.class);
+  private AtomicBoolean rebalanceFlag;
+
+  public ChannelRebalanceListener(AtomicBoolean rebalanceFlag) {
+    this.rebalanceFlag = rebalanceFlag;
+  }
+
+  // Set a flag that a rebalance has occurred. Then we can commit the currently written transactions
+  // on the next doTake() pass.
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+      rebalanceFlag.set(true);
+    }
+  }
+
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 9a342ef..faf46b6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -18,27 +18,45 @@
  */
 package org.apache.flume.channel.kafka;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+
 public class KafkaChannelConfiguration {
 
   public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+  public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
+  public static final String DEFAULT_ACKS = "all";
+  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
+  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String DEFAULT_TOPIC = "flume-channel";
+  public static final String DEFAULT_GROUP_ID = "flume";
+  public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
+  public static final long DEFAULT_POLL_TIMEOUT = 500;
+
+  public static final String KEY_HEADER = "key";
+
+  public static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
+
+  public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
+  public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
+
+  /*** Old Configuration Parameters ****/
   public static final String BROKER_LIST_KEY = "metadata.broker.list";
   public static final String REQUIRED_ACKS_KEY = "request.required.acks";
   public static final String BROKER_LIST_FLUME_KEY = "brokerList";
-  public static final String TOPIC = "topic";
-  public static final String GROUP_ID = "group.id";
+  //public static final String TOPIC = "topic";
   public static final String GROUP_ID_FLUME = "groupId";
   public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
   public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
   public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
-  public static final String DEFAULT_GROUP_ID = "flume";
-  public static final String DEFAULT_TOPIC = "flume-channel";
   public static final String TIMEOUT = "timeout";
   public static final String DEFAULT_TIMEOUT = "100";
   public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
 
-  public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
-  public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
-
   public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
   public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 319e779..637428d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -19,13 +19,8 @@
 package org.apache.flume.channel.kafka;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -33,17 +28,26 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.*;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
 public class TestKafkaChannel {
 
+  private final static Logger LOGGER =
+          LoggerFactory.getLogger(TestKafkaChannel.class);
+
   private static TestUtil testUtil = TestUtil.getInstance();
   private String topic = null;
   private final Set<String> usedTopics = new HashSet<String>();
@@ -78,11 +82,52 @@ public class TestKafkaChannel {
     testUtil.tearDown();
   }
 
+  //Make sure the props are picked up correctly.
+  @Test
+  public void testProps() throws Exception {
+    Context context = new Context();
+    context.put("kafka.producer.some-parameter", "1");
+    context.put("kafka.consumer.another-parameter", "1");
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(TOPIC_CONFIG, topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty("some-parameter"), "1");
+    Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
+  }
+
+  @Test
+  public void testOldConfig() throws Exception {
+    Context context = new Context();
+    context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl());
+    context.put(GROUP_ID_FLUME,"flume-something");
+    context.put(READ_SMALLEST_OFFSET,"true");
+    context.put("topic",topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl());
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something");
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+
+  }
+
+
   @Test
   public void testSuccess() throws Exception {
     doTestSuccessRollback(false, false);
   }
 
+
   @Test
   public void testSuccessInterleave() throws Exception {
     doTestSuccessRollback(false, true);
@@ -99,7 +144,7 @@ public class TestKafkaChannel {
   }
 
   private void doTestSuccessRollback(final boolean rollback,
-    final boolean interleave) throws Exception {
+                                     final boolean interleave) throws Exception {
     final KafkaChannel channel = startChannel(true);
     writeAndVerify(rollback, channel, interleave);
     channel.stop();
@@ -122,82 +167,89 @@ public class TestKafkaChannel {
   }
 
   @Test
-  public void testNoParsingAsFlumeAgent() throws Exception {
+  public void testParseAsFlumeEventFalse() throws Exception {
+    doParseAsFlumeEventFalse(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseCheckHeader() throws Exception {
+    doParseAsFlumeEventFalse(true);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSource() throws Exception {
+    doParseAsFlumeEventFalseAsSource(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception {
+    doParseAsFlumeEventFalseAsSource(true);
+  }
+
+  private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
-    Producer<String, byte[]> producer = new Producer<String, byte[]>(
-      new ProducerConfig(channel.getKafkaConf()));
-    List<KeyedMessage<String, byte[]>> original = Lists.newArrayList();
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
+
     for (int i = 0; i < 50; i++) {
-      KeyedMessage<String, byte[]> data = new KeyedMessage<String,
-        byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6),
-        String.valueOf(i).getBytes());
-      original.add(data);
+      ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes());
+      producer.send(data).get();
     }
-    producer.send(original);
     ExecutorCompletionService<Void> submitterSvc = new
-      ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
     List<Event> events = pullEvents(channel, submitterSvc,
-      50, false, false);
+            50, false, false);
     wait(submitterSvc, 5);
-    Set<Integer> finals = Sets.newHashSet();
+    Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.contains(i));
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
       finals.remove(i);
     }
     Assert.assertTrue(finals.isEmpty());
     channel.stop();
   }
 
-  @Test
-  public void testTimeoutConfig() throws Exception {
-    Context context = prepareDefaultContext(true);
-    KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
-      .equals(DEFAULT_TIMEOUT));
-
-    String timeout = "1000";
-    context.put("kafka."+CONSUMER_TIMEOUT, timeout);
-    channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
-            .equals(timeout));
-  }
-
   /**
    * Like the previous test but here we write to the channel like a Flume source would do
    * to verify that the events are written as text and not as an Avro object
    *
    * @throws Exception
    */
-  @Test
-  public void testWritingToNoParsingAsFlumeAgent() throws Exception {
+  public void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
 
     List<String> msgs = new ArrayList<String>();
-    for (int i = 0; i < 50; i++){
+    Map<String, String> headers = new HashMap<String, String>();
+    for (int i = 0; i < 50; i++) {
       msgs.add(String.valueOf(i));
     }
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for (int i = 0; i < msgs.size(); i++){
-      channel.put(EventBuilder.withBody(msgs.get(i).getBytes()));
+    for (int i = 0; i < msgs.size(); i++) {
+      headers.put(KEY_HEADER, String.valueOf(i) + "-header");
+      channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
     }
     tx.commit();
     ExecutorCompletionService<Void> submitterSvc = new
             ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
     List<Event> events = pullEvents(channel, submitterSvc,
-      50, false, false);
+            50, false, false);
     wait(submitterSvc, 5);
-    Set<Integer> finals = Sets.newHashSet();
+    Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.contains(i));
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
       finals.remove(i);
     }
     Assert.assertTrue(finals.isEmpty());
@@ -216,12 +268,12 @@ public class TestKafkaChannel {
    * @throws Exception
    */
   private void doTestStopAndStart(boolean rollback,
-    boolean retryAfterRollback) throws Exception {
+                                  boolean retryAfterRollback) throws Exception {
     final KafkaChannel channel = startChannel(true);
     ExecutorService underlying = Executors
-      .newCachedThreadPool();
+            .newCachedThreadPool();
     ExecutorCompletionService<Void> submitterSvc =
-      new ExecutorCompletionService<Void>(underlying);
+            new ExecutorCompletionService<Void>(underlying);
     final List<List<Event>> events = createBaseList();
     putEvents(channel, events, submitterSvc);
     int completed = 0;
@@ -233,14 +285,14 @@ public class TestKafkaChannel {
       total = 40;
     }
     final List<Event> eventsPulled =
-      pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
+            pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
     wait(submitterSvc, 5);
     channel2.stop();
     if (!retryAfterRollback && rollback) {
       final KafkaChannel channel3 = startChannel(true);
       int expectedRemaining = 50 - eventsPulled.size();
       final List<Event> eventsPulled2 =
-        pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
+              pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
       wait(submitterSvc, 5);
       Assert.assertEquals(expectedRemaining, eventsPulled2.size());
       eventsPulled.addAll(eventsPulled2);
@@ -259,18 +311,18 @@ public class TestKafkaChannel {
   }
 
   private void writeAndVerify(final boolean testRollbacks,
-    final KafkaChannel channel) throws Exception {
+                              final KafkaChannel channel) throws Exception {
     writeAndVerify(testRollbacks, channel, false);
   }
 
   private void writeAndVerify(final boolean testRollbacks,
-    final KafkaChannel channel, final boolean interleave) throws Exception {
+                              final KafkaChannel channel, final boolean interleave) throws Exception {
 
     final List<List<Event>> events = createBaseList();
 
     ExecutorCompletionService<Void> submitterSvc =
-      new ExecutorCompletionService<Void>(Executors
-        .newCachedThreadPool());
+            new ExecutorCompletionService<Void>(Executors
+                    .newCachedThreadPool());
 
     putEvents(channel, events, submitterSvc);
 
@@ -279,11 +331,11 @@ public class TestKafkaChannel {
     }
 
     ExecutorCompletionService<Void> submitterSvc2 =
-      new ExecutorCompletionService<Void>(Executors
-        .newCachedThreadPool());
+            new ExecutorCompletionService<Void>(Executors
+                    .newCachedThreadPool());
 
     final List<Event> eventsPulled =
-      pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+            pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
 
     if (!interleave) {
       wait(submitterSvc, 5);
@@ -301,7 +353,7 @@ public class TestKafkaChannel {
       for (int j = 0; j < 10; j++) {
         Map<String, String> hdrs = new HashMap<String, String>();
         String v = (String.valueOf(i) + " - " + String
-          .valueOf(j));
+                .valueOf(j));
         hdrs.put("header", v);
         eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
       }
@@ -310,7 +362,7 @@ public class TestKafkaChannel {
   }
 
   private void putEvents(final KafkaChannel channel, final List<List<Event>>
-    events, ExecutorCompletionService<Void> submitterSvc) {
+          events, ExecutorCompletionService<Void> submitterSvc) {
     for (int i = 0; i < 5; i++) {
       final int index = i;
       submitterSvc.submit(new Callable<Void>() {
@@ -334,10 +386,10 @@ public class TestKafkaChannel {
   }
 
   private List<Event> pullEvents(final KafkaChannel channel,
-    ExecutorCompletionService<Void> submitterSvc, final int total,
-    final boolean testRollbacks, final boolean retryAfterRollback) {
+                                 ExecutorCompletionService<Void> submitterSvc, final int total,
+                                 final boolean testRollbacks, final boolean retryAfterRollback) {
     final List<Event> eventsPulled = Collections.synchronizedList(new
-      ArrayList<Event>(50));
+            ArrayList<Event>(50));
     final CyclicBarrier barrier = new CyclicBarrier(5);
     final AtomicInteger counter = new AtomicInteger(0);
     final AtomicInteger rolledBackCount = new AtomicInteger(0);
@@ -366,9 +418,9 @@ public class TestKafkaChannel {
                 eventsLocal.add(e);
               } else {
                 if (testRollbacks &&
-                  index == 4 &&
-                  (!rolledBack.get()) &&
-                  startedGettingEvents.get()) {
+                        index == 4 &&
+                        (!rolledBack.get()) &&
+                        startedGettingEvents.get()) {
                   tx.rollback();
                   tx.close();
                   tx = null;
@@ -407,7 +459,7 @@ public class TestKafkaChannel {
   }
 
   private void wait(ExecutorCompletionService<Void> submitterSvc, int max)
-    throws Exception {
+          throws Exception {
     int completed = 0;
     while (completed < max) {
       submitterSvc.take();
@@ -420,8 +472,7 @@ public class TestKafkaChannel {
     Assert.assertEquals(50, eventsPulled.size());
     Set<String> eventStrings = new HashSet<String>();
     for (Event e : eventsPulled) {
-      Assert
-        .assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
+      Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
       eventStrings.add(e.getHeaders().get("header"));
     }
     for (int i = 0; i < 5; i++) {
@@ -437,14 +488,10 @@ public class TestKafkaChannel {
   private Context prepareDefaultContext(boolean parseAsFlume) {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
-    context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY,
-      testUtil.getKafkaServerUrl());
-    context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY,
-      testUtil.getZkUrl());
-    context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT,
-      String.valueOf(parseAsFlume));
-    context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true");
-    context.put(KafkaChannelConfiguration.TOPIC, topic);
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
+    context.put(TOPIC_CONFIG, topic);
+
     return context;
   }
 
@@ -452,22 +499,18 @@ public class TestKafkaChannel {
     int numPartitions = 5;
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
-      sessionTimeoutMs, connectionTimeoutMs,
-      ZKStringSerializer$.MODULE$);
+    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
 
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkClient, topicName, numPartitions,
-      replicationFactor, topicConfig);
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
+            replicationFactor, topicConfig);
   }
 
   public static void deleteTopic(String topicName) {
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
-      sessionTimeoutMs, connectionTimeoutMs,
-      ZKStringSerializer$.MODULE$);
-    AdminUtils.deleteTopic(zkClient, topicName);
+    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    AdminUtils.deleteTopic(zkUtils, topicName);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index c10c89d..216bfd8 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -38,7 +38,7 @@ port=9092
 #advertised.port=<port accessible by clients>
 
 # The number of threads handling network requests
-num.network.threads=2
+num.network.threads=4
 
 # The number of threads doing disk I/O
 num.io.threads=8

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 15f27c3..5149ab5 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2679,36 +2679,60 @@ The Kafka channel can be used for multiple scenarios:
 * With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
 * With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr
 
+
+This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.
+
+The configuration parameters are organized as such:
+1) Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
+2) Configuration values related to Kafka or how the Channel operates are prefixed with "kafka.",  (this are analgous to CommonClient Configs)eg: a1.channels.k1.kafka.topica1.channels.k1.kafka.bootstrap.serversThis is not dissimilar to how the hdfs sink operates
+3) Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
+4) Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks
+
+This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message
+is logged on startup when they are present in the configuration file.
+
 Required properties are in **bold**.
 
-======================  ==========================  ===============================================================================================================
-Property Name           Default                           Description
-======================  ==========================  ===============================================================================================================
-**type**                --                          The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
-**brokerList**          --                          List of brokers in the Kafka cluster used by the channel
-                                                    This can be a partial list of brokers, but we recommend at least two for HA.
-                                                    The format is comma separated list of hostname:port
-**zookeeperConnect**    --                          URI of ZooKeeper used by Kafka cluster
-                                                    The format is comma separated list of hostname:port. If chroot is used, it is added once at the end.
-                                                    For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka
-topic                   flume-channel               Kafka topic which the channel will use
-groupId                 flume                       Consumer group ID the channel uses to register with Kafka.
-                                                    Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
-                                                    Note that having non-channel consumers with the same ID can lead to data loss.
-parseAsFlumeEvent       true                        Expecting Avro datums with FlumeEvent schema in the channel.
-                                                    This should be true if Flume source is writing to the channel
-                                                    And false if other producers are writing into the topic that the channel is using
-                                                    Flume source messages to Kafka can be parsed outside of Flume by using
-                                                    org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
-readSmallestOffset      false                       When set to true, the channel will read all data in the topic, starting from the oldest event
-                                                    when false, it will read only events written after the channel started
-                                                    When "parseAsFlumeEvent" is true, this will be false. Flume source will start prior to the sinks and this
-                                                    guarantees that events sent by source before sinks start will not be lost.
-Other Kafka Properties  --                          These properties are used to configure the Kafka Producer and Consumer used by the channel.
-                                                    Any property supported by Kafka can be used.
-                                                    The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                                    For example: kafka.producer.type
-======================  ==========================  ===============================================================================================================
+================================  ==========================  ===============================================================================================================
+Property Name                     Default                     Description
+================================  ==========================  ===============================================================================================================
+**type**                          --                          The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
+**kafka.bootstrap.servers**       --                          List of brokers in the Kafka cluster used by the channel
+                                                              This can be a partial list of brokers, but we recommend at least two for HA.
+                                                              The format is comma separated list of hostname:port
+kafka.topic                       flume-channel               Kafka topic which the channel will use
+kafka.consumer.group.id           flume                       Consumer group ID the channel uses to register with Kafka.
+                                                              Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
+                                                              Note that having non-channel consumers with the same ID can lead to data loss.
+
+parseAsFlumeEvent                 true                        Expecting Avro datums with FlumeEvent schema in the channel.
+                                                              This should be true if Flume source is writing to the channel and false if other producers are
+                                                              writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
+                                                              org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
+pollTimeout                       500                         The amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
+                                                              https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
+kafka.consumer.auto.offset.reset  latest                      What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
+                                                              (e.g. because that data has been deleted):
+                                                              earliest: automatically reset the offset to the earliest offset
+                                                              latest: automatically reset the offset to the latest offset
+                                                              none: throw exception to the consumer if no previous offset is found for the consumer\'s group
+                                                              anything else: throw exception to the consumer.
+================================  ==========================  ===============================================================================================================
+
+Deprecated Properties
+
+================================  ==========================  ===============================================================================================================
+Property Name                     Default                     Description
+================================  ==========================  ===============================================================================================================
+brokerList                        --                          List of brokers in the Kafka cluster used by the channel
+                                                              This can be a partial list of brokers, but we recommend at least two for HA.
+                                                              The format is comma separated list of hostname:port
+topic                             flume-channel               Use kafka.topic
+groupId                           flume                       Use kafka.consumer.group.id
+readSmallestOffset                false                       Use kafka.consumer.auto.offset.reset
+
+================================  ==========================  ===============================================================================================================
 
 .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up
 
@@ -2716,12 +2740,12 @@ Example for agent named a1:
 
 .. code-block:: properties
 
-    a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
     a1.channels.channel1.capacity = 10000
     a1.channels.channel1.transactionCapacity = 1000
-    a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
-    a1.channels.channel1.topic=channel1
-    a1.channels.channel1.zookeeperConnect=kafka-1:2181
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
 
 File Channel
 ~~~~~~~~~~~~


[2/3] flume git commit: FLUME-2822: Flume-Kafka-Sink with new Producer

Posted by ja...@apache.org.
FLUME-2822: Flume-Kafka-Sink with new Producer

(Jeff Holoman via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7f588e6a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7f588e6a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7f588e6a

Branch: refs/heads/trunk
Commit: 7f588e6a158f5d108e39f50a92f8d1d108b12c24
Parents: f8abaf7
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:43:40 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:43:40 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  49 ++--
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   7 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 224 +++++++++++++++----
 .../flume/sink/kafka/KafkaSinkConstants.java    |  38 ++--
 .../apache/flume/sink/kafka/KafkaSinkUtil.java  | 103 ---------
 .../flume/sink/kafka/KafkaSinkUtilTest.java     |  55 -----
 .../apache/flume/sink/kafka/TestKafkaSink.java  |  74 +++++-
 7 files changed, 310 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 341ae42..15f27c3 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2493,7 +2493,9 @@ Kafka Sink
 This is a Flume Sink implementation that can publish data to a
 `Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of releases.
+through various Flume sources. This currently supports Kafka 0.9.x series of releases.
+
+This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
 
 Required properties are marked in bold font.
 
@@ -2502,20 +2504,21 @@ Required properties are marked in bold font.
 Property Name                    Default              Description
 ===============================  ===================  =============================================================================================
 **type**                         --                   Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
-**brokerList**                   --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
+**kafka.bootstrap.servers**      --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
                                                       This can be a partial list of brokers, but we recommend at least two for HA.
                                                       The format is comma separated list of hostname:port
-topic                            default-flume-topic  The topic in Kafka to which the messages will be published. If this parameter is configured,
+kafka.topic                      default-flume-topic  The topic in Kafka to which the messages will be published. If this parameter is configured,
                                                       messages will be published to this topic.
                                                       If the event header contains a "topic" field, the event will be published to that topic
                                                       overriding the topic configured here.
-batchSize                        100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
-requiredAcks                     1                    How many replicas must acknowledge a message before its considered successfully written.
+flumeBatchSize                   100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
+kafka.producer.acks              1                    How many replicas must acknowledge a message before its considered successfully written.
                                                       Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
                                                       Set this to -1 to avoid data loss in some cases of leader failure.
 Other Kafka Producer Properties  --                   These properties are used to configure the Kafka Producer. Any producer property supported
-                                                      by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                                      For example: kafka.producer.type
+                                                      by Kafka can be used. The only requirement is to prepend the property name with the prefix
+                                                      ``kafka.producer``.
+                                                      For example: kafka.producer.linger.ms
 ===============================  ===================  =============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka.
@@ -2523,22 +2526,38 @@ Other Kafka Producer Properties  --                   These properties are used
             If ``key`` exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key
             will be sent to the same partition. If the key is null, events will be sent to random partitions.
 
+The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
+
+Deprecated Properties
+
+===============================  ===================  =============================================================================================
+Property Name                    Default              Description
+===============================  ===================  =============================================================================================
+brokerList                       --                   Use kafka.bootstrap.servers
+topic                            default-flume-topic  Use kafka.topic
+batchSize                        100                  Use kafka.flumeBatchSize
+requiredAcks                     1                    Use kafka.producer.acks
+
+===============================  ===================  =============================================================================================
+
 An example configuration of a Kafka sink is given below. Properties starting
-with the prefix ``kafka`` (the last 3 properties) are used when instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix ``kafka.producer`` the Kafka producer. The properties that are passed when creating the Kafka
 producer are not limited to the properties given in this example.
-Also it's possible include your custom properties here and access them inside
+Also it is possible to include your custom properties here and access them inside
 the preprocessor through the Flume Context object passed in as a method
 argument.
 
 .. code-block:: properties
 
-    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
-    a1.sinks.k1.topic = mytopic
-    a1.sinks.k1.brokerList = localhost:9092
-    a1.sinks.k1.requiredAcks = 1
-    a1.sinks.k1.batchSize = 20
     a1.sinks.k1.channel = c1
+    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
+    a1.sinks.k1.kafka.topic = mytopic
+    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
+    a1.sinks.k1.kafka.flumeBatchSize = 20
+    a1.sinks.k1.kafka.producer.acks = 1
+    a1.sinks.k1.kafka.producer.linger.ms = 1
+    a1.sinks.ki.kafka.producer.compression.type = snappy
 
 Custom Sink
 ~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 8475aa1..195c921 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -72,7 +72,14 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
+      <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 38b854b..2e2140e 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -19,20 +19,46 @@
 package org.apache.flume.sink.kafka;
 
 import com.google.common.base.Throwables;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Properties;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_HEADER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
+
 
 /**
  * A Flume Sink that can publish messages to Kafka.
@@ -67,16 +93,25 @@ import java.util.ArrayList;
 public class KafkaSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
-  public static final String KEY_HDR = "key";
-  public static final String TOPIC_HDR = "topic";
-  private Properties kafkaProps;
-  private Producer<String, byte[]> producer;
+
+  private final Properties kafkaProps = new Properties();
+  private KafkaProducer<String, byte[]> producer;
+
   private String topic;
   private int batchSize;
-  private List<KeyedMessage<String, byte[]>> messageList;
+  private List<Future<RecordMetadata>> kafkaFutures;
   private KafkaSinkCounter counter;
 
 
+  //For testing
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
     Status result = Status.READY;
@@ -92,7 +127,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
       transaction = channel.getTransaction();
       transaction.begin();
 
-      messageList.clear();
+      kafkaFutures.clear();
+      long batchStartTime = System.nanoTime();
       for (; processedEvents < batchSize; processedEvents += 1) {
         event = channel.take();
 
@@ -110,11 +146,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
         byte[] eventBody = event.getBody();
         Map<String, String> headers = event.getHeaders();
 
-        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
+        eventTopic = headers.get(TOPIC_HEADER);
+        if (eventTopic == null) {
           eventTopic = topic;
         }
-
-        eventKey = headers.get(KEY_HDR);
+        eventKey = headers.get(KEY_HEADER);
 
         if (logger.isDebugEnabled()) {
           logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
@@ -123,19 +159,22 @@ public class KafkaSink extends AbstractSink implements Configurable {
         }
 
         // create a message and add to buffer
-        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
-          (eventTopic, eventKey, eventBody);
-        messageList.add(data);
-
+        long startTime = System.currentTimeMillis();
+        kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, eventBody),
+                                         new SinkCallback(startTime)));
       }
 
+      //Prevent linger.ms from holding the batch
+      producer.flush();
+
       // publish batch and commit.
       if (processedEvents > 0) {
-        long startTime = System.nanoTime();
-        producer.send(messageList);
+          for (Future<RecordMetadata> future : kafkaFutures) {
+            future.get();
+          }
         long endTime = System.nanoTime();
-        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
-        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
+        counter.addToKafkaEventSendTimer((endTime-batchStartTime)/(1000*1000));
+        counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
       }
 
       transaction.commit();
@@ -146,6 +185,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
       result = Status.BACKOFF;
       if (transaction != null) {
         try {
+          kafkaFutures.clear();
           transaction.rollback();
           counter.incrementRollbackCount();
         } catch (Exception e) {
@@ -166,8 +206,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
   @Override
   public synchronized void start() {
     // instantiate the producer
-    ProducerConfig config = new ProducerConfig(kafkaProps);
-    producer = new Producer<String, byte[]>(config);
+    producer = new KafkaProducer<String,byte[]>(kafkaProps);
     counter.start();
     super.start();
   }
@@ -197,31 +236,132 @@ public class KafkaSink extends AbstractSink implements Configurable {
   @Override
   public void configure(Context context) {
 
-    batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
-      KafkaSinkConstants.DEFAULT_BATCH_SIZE);
-    messageList =
-      new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
-    logger.debug("Using batch size: {}", batchSize);
-
-    topic = context.getString(KafkaSinkConstants.TOPIC,
-      KafkaSinkConstants.DEFAULT_TOPIC);
-    if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
-      logger.warn("The Property 'topic' is not set. " +
-        "Using the default topic name: " +
-        KafkaSinkConstants.DEFAULT_TOPIC);
-    } else {
-      logger.info("Using the static topic: " + topic +
-        " this may be over-ridden by event headers");
+    translateOldProps(context);
+
+    String topicStr = context.getString(TOPIC_CONFIG);
+    if (topicStr == null || topicStr.isEmpty()) {
+      topicStr = DEFAULT_TOPIC;
+      logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
+    }
+    else {
+      logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
+    }
+
+    topic = topicStr;
+
+    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Using batch size: {}", batchSize);
+    }
+
+    kafkaFutures = new LinkedList<Future<RecordMetadata>>();
+
+    String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG);
+    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
-    kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
+    setProducerProps(context, bootStrapServers);
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Kafka producer properties: " + kafkaProps);
+      logger.debug("Kafka producer properties: {}" , kafkaProps);
     }
 
     if (counter == null) {
       counter = new KafkaSinkCounter(getName());
     }
   }
+
+  private void translateOldProps(Context ctx) {
+
+    if (!(ctx.containsKey(TOPIC_CONFIG))) {
+      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
+      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
+    }
+
+    //Broker List
+    // If there is no value we need to check and set the old param and log a warning message
+    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
+      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+      if (brokerList == null || brokerList.isEmpty()) {
+        throw new ConfigurationException("Bootstrap Servers must be specified");
+      } else {
+        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+      }
+    }
+
+    //batch Size
+    if (!(ctx.containsKey(BATCH_SIZE))) {
+      String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
+      if ( oldBatchSize != null  && !oldBatchSize.isEmpty())  {
+        ctx.put(BATCH_SIZE, oldBatchSize);
+        logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
+      }
+    }
+
+    // Acks
+    if (!(ctx.containsKey(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG))) {
+      String requiredKey = ctx.getString(
+              KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
+      if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
+        ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
+        logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
+                KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);
+      }
+    }
+
+    if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
+      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
+              "a different interface for serializers. Please use the parameter {}",
+              KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+    }
+
+    if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
+      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
+                      "a different interface for serializers. Please use the parameter {}",
+              MESSAGE_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+    }
+
+
+
+
+
+  }
+  private void setProducerProps(Context context, String bootStrapServers) {
+    kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+    //Defaults overridden based on config
+    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
+    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
+    kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
+    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    logger.info("Producer properties: {}" , kafkaProps.toString());
+  }
+
+  protected Properties getKafkaProps() {
+    return kafkaProps;
+  }
+}
+
+class SinkCallback implements Callback {
+  private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
+  private long startTime;
+
+  public SinkCallback(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    if (exception != null) {
+      logger.debug("Error sending message to Kafka {} ", exception.getMessage());
+    }
+
+    if (logger.isDebugEnabled()) {
+      long eventElapsedTime = System.currentTimeMillis() - startTime;
+      logger.debug("Acked message partition:{} ofset:{}",  metadata.partition(), metadata.offset());
+      logger.debug("Elapsed time for send: {}", eventElapsedTime);
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 3ee12de..c84dec0 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -18,30 +18,42 @@
 
 package org.apache.flume.sink.kafka;
 
-import kafka.serializer.StringDecoder;
+import org.apache.kafka.clients.CommonClientConfigs;
 
 public class KafkaSinkConstants {
 
-  public static final String PROPERTY_PREFIX = "kafka.";
+  public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
 
   /* Properties */
 
-  public static final String TOPIC = "topic";
-  public static final String BATCH_SIZE = "batchSize";
+  public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
+  public static final String BATCH_SIZE = "flumeBatchSize";
+  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+  public static final String KEY_HEADER = "key";
+  public static final String TOPIC_HEADER = "topic";
+
+  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+  public static final int DEFAULT_BATCH_SIZE = 100;
+  public static final String DEFAULT_TOPIC = "default-flume-topic";
+  public static final String DEFAULT_ACKS = "1";
+
+
+
+  /* Old Properties */
+
+   /* Properties */
+
+  public static final String OLD_BATCH_SIZE = "batchSize";
   public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
   public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
-  public static final String BROKER_LIST_KEY = "metadata.broker.list";
-  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
   public static final String BROKER_LIST_FLUME_KEY = "brokerList";
   public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
 
 
 
-  public static final int DEFAULT_BATCH_SIZE = 100;
-  public static final String DEFAULT_TOPIC = "default-flume-topic";
-  public static final String DEFAULT_MESSAGE_SERIALIZER =
-          "kafka.serializer.DefaultEncoder";
-  public static final String DEFAULT_KEY_SERIALIZER =
-          "kafka.serializer.StringEncoder";
-  public static final String DEFAULT_REQUIRED_ACKS = "1";
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
deleted file mode 100644
index 66bde85..0000000
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kafka;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.util.PropertiesTrait;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaSinkUtil {
-
-  private static final Logger log =
-          LoggerFactory.getLogger(KafkaSinkUtil.class);
-
-  public static Properties getKafkaProperties(Context context) {
-    log.info("context={}",context.toString());
-    Properties props =  generateDefaultKafkaProps();
-    setKafkaProps(context, props);
-    addDocumentedKafkaProps(context, props);
-    return props;
-  }
-
-  /**
-   * Some of the producer properties are especially important
-   * We documented them and gave them a camel-case name to match Flume config
-   * If user set these, we will override any existing parameters with these
-   * settings.
-   * Knowledge of which properties are documented is maintained here for now.
-   * If this will become a maintenance issue we'll set a proper data structure.
-   */
-  private static void addDocumentedKafkaProps(Context context,
-                                              Properties kafkaProps)
-          throws ConfigurationException {
-    String brokerList = context.getString(KafkaSinkConstants
-            .BROKER_LIST_FLUME_KEY);
-    if (brokerList == null) {
-      throw new ConfigurationException("brokerList must contain at least " +
-              "one Kafka broker");
-    }
-    kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
-
-    String requiredKey = context.getString(
-            KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
-
-    if (requiredKey != null ) {
-      kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
-    }
-  }
-
-
-  /**
-   * Generate producer properties object with some defaults
-   * @return
-   */
-  private static Properties generateDefaultKafkaProps() {
-    Properties props = new Properties();
-    props.put(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY,
-            KafkaSinkConstants.DEFAULT_MESSAGE_SERIALIZER);
-    props.put(KafkaSinkConstants.KEY_SERIALIZER_KEY,
-            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
-    props.put(KafkaSinkConstants.REQUIRED_ACKS_KEY,
-            KafkaSinkConstants.DEFAULT_REQUIRED_ACKS);
-    return props;
-  }
-
-
-  /**
-   * Add all configuration parameters starting with "kafka"
-   * to producer properties
-   */
-  private static void setKafkaProps(Context context, Properties kafkaProps) {
-
-    Map<String,String> kafkaProperties =
-            context.getSubProperties(KafkaSinkConstants.PROPERTY_PREFIX);
-
-    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
-
-      kafkaProps.put(prop.getKey(), prop.getValue());
-      if (log.isDebugEnabled()) {
-        log.debug("Reading a Kafka Producer Property: key: "
-                + prop.getKey() + ", value: " + prop.getValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
deleted file mode 100644
index 84d213c..0000000
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kafka;
-
-import junit.framework.TestCase;
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurables;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaSinkUtilTest extends TestCase {
-
-  @Test
-  public void testGetKafkaProperties() {
-    Context context = new Context();
-    context.put("kafka.serializer.class", "override.default.serializer");
-    context.put("kafka.fake.property", "kafka.property.value");
-    context.put("kafka.metadata.broker.list","bad-broker-list");
-    context.put("brokerList","real-broker-list");
-    Properties kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
-
-    //check that we have defaults set
-    assertEquals(
-            kafkaProps.getProperty(KafkaSinkConstants.KEY_SERIALIZER_KEY),
-            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
-    //check that kafka properties override the default and get correct name
-    assertEquals(
-            kafkaProps.getProperty(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY),
-            "override.default.serializer");
-    //check that any kafka property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"),
-            "kafka.property.value");
-    //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("metadata.broker.list")
-            ,"real-broker-list");
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 72117b1..1852099 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -19,11 +19,18 @@
 package org.apache.flume.sink.kafka;
 
 import kafka.message.MessageAndMetadata;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -33,12 +40,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.*;
+
 /**
  * Unit tests for Kafka Sink
  */
@@ -50,7 +59,7 @@ public class TestKafkaSink {
   public static void setup() {
     testUtil.prepare();
     List<String> topics = new ArrayList<String>(3);
-    topics.add(KafkaSinkConstants.DEFAULT_TOPIC);
+    topics.add(DEFAULT_TOPIC);
     topics.add(TestConstants.STATIC_TOPIC);
     topics.add(TestConstants.CUSTOM_TOPIC);
     testUtil.initTopicList(topics);
@@ -62,6 +71,50 @@ public class TestKafkaSink {
   }
 
   @Test
+  public void testKafkaProperties() {
+
+    KafkaSink kafkaSink = new KafkaSink();
+    Context context = new Context();
+    context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
+    context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "override.default.serializer");
+    context.put("kafka.producer.fake.property", "kafka.property.value");
+    context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
+    context.put("brokerList","real-broker-list");
+    Configurables.configure(kafkaSink,context);
+
+    Properties kafkaProps = kafkaSink.getKafkaProps();
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), DEFAULT_KEY_SERIALIZER);
+    //check that kafka properties override the default and get correct name
+    assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), "override.default.serializer");
+    //check that any kafka-producer property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"), "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("bootstrap.servers") ,"localhost:9092,localhost:9092");
+  }
+
+  @Test
+  public void testOldProperties() {
+    KafkaSink kafkaSink = new KafkaSink();
+    Context context = new Context();
+    context.put("topic","test-topic");
+    context.put(OLD_BATCH_SIZE, "300");
+    context.put(BROKER_LIST_FLUME_KEY,"localhost:9092,localhost:9092");
+    context.put(REQUIRED_ACKS_FLUME_KEY, "all");
+    Configurables.configure(kafkaSink,context);
+
+    Properties kafkaProps = kafkaSink.getKafkaProps();
+
+    assertEquals(kafkaSink.getTopic(), "test-topic");
+    assertEquals(kafkaSink.getBatchSize(),300);
+    assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),"localhost:9092,localhost:9092");
+    assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
+
+  }
+
+  @Test
   public void testDefaultTopic() {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
@@ -89,7 +142,7 @@ public class TestKafkaSink {
     }
 
     String fetchedMsg = new String((byte[])
-      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)
+      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)
         .message());
     assertEquals(msg, fetchedMsg);
   }
@@ -98,7 +151,7 @@ public class TestKafkaSink {
   public void testStaticTopic() {
     Context context = prepareDefaultContext();
     // add the static topic
-    context.put(KafkaSinkConstants.TOPIC, TestConstants.STATIC_TOPIC);
+    context.put(TOPIC_CONFIG, TestConstants.STATIC_TOPIC);
     String msg = "static-topic-test";
 
     try {
@@ -110,8 +163,7 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(
-      TestConstants.STATIC_TOPIC).message());
+    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(TestConstants.STATIC_TOPIC).message());
     assertEquals(msg, fetchedMsg);
   }
 
@@ -172,16 +224,14 @@ public class TestKafkaSink {
       fail("Error Occurred");
     }
     assertNull(
-      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
+      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
   }
 
   private Context prepareDefaultContext() {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
-    context.put("brokerList", testUtil.getKafkaServerUrl());
-    context.put("kafka.request.required.acks", "1");
-    context.put("kafka.producer.type","sync");
-    context.put("batchSize", "1");
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(BATCH_SIZE, "1");
     return context;
   }