You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/29 19:44:37 UTC

[3/3] flume git commit: FLUME-2823: Flume-Kafka-Channel with new APIs

FLUME-2823: Flume-Kafka-Channel with new APIs

(Jeff Holoman via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e8c4a7bf
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e8c4a7bf
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e8c4a7bf

Branch: refs/heads/trunk
Commit: e8c4a7bffc74f6ea10ae6cc45adbaf4919f45186
Parents: 7f588e6
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:45:40 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:45:40 2016 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |   6 +
 .../flume/channel/kafka/KafkaChannel.java       | 641 ++++++++++++-------
 .../kafka/KafkaChannelConfiguration.java        |  32 +-
 .../flume/channel/kafka/TestKafkaChannel.java   | 219 ++++---
 .../src/test/resources/kafka-server.properties  |   2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  88 ++-
 6 files changed, 634 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index fa1bd42..587b4b4 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -40,6 +40,12 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index c0c1c66..2d9b0c6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,108 +20,120 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import kafka.consumer.*;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.avro.io.*;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.conf.ConfigurationException;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
 public class KafkaChannel extends BasicChannelSemantics {
 
-  private final static Logger LOGGER =
-    LoggerFactory.getLogger(KafkaChannel.class);
+  private final static Logger logger =
+          LoggerFactory.getLogger(KafkaChannel.class);
 
+  private final Properties consumerProps = new Properties();
+  private final Properties producerProps = new Properties();
 
-  private final Properties kafkaConf = new Properties();
-  private Producer<String, byte[]> producer;
+  private KafkaProducer<String, byte[]> producer;
   private final String channelUUID = UUID.randomUUID().toString();
 
   private AtomicReference<String> topic = new AtomicReference<String>();
   private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
-  private final Map<String, Integer> topicCountMap =
-    Collections.synchronizedMap(new HashMap<String, Integer>());
+
+  //used to indicate if a rebalance has occurred during the current transaction
+  AtomicBoolean rebalanceFlag = new AtomicBoolean();
+  //This isn't a Kafka property per se, but we allow it to be configurable
+  private long pollTimeout = DEFAULT_POLL_TIMEOUT;
+
 
   // Track all consumers to close them eventually.
-  private final List<ConsumerAndIterator> consumers =
-    Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
+  private final List<ConsumerAndRecords> consumers =
+          Collections.synchronizedList(new LinkedList<ConsumerAndRecords>());
 
   private KafkaChannelCounter counter;
 
-  /* Each ConsumerConnector commit will commit all partitions owned by it. To
+   /* Each Consumer commit will commit all partitions owned by it. To
    * ensure that each partition is only committed when all events are
-   * actually done, we will need to keep a ConsumerConnector per thread.
-   * See Neha's answer here:
-   * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic
-   * Since only one consumer connector will a partition at any point in time,
-   * when we commit the partition we would have committed all events to the
-   * final destination from that partition.
-   *
-   * If a new partition gets assigned to this connector,
-   * my understanding is that all message from the last partition commit will
-   * get replayed which may cause duplicates -- which is fine as this
-   * happens only on partition rebalancing which is on failure or new nodes
-   * coming up, which is rare.
+   * actually done, we will need to keep a Consumer per thread.
    */
-  private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new
-    ThreadLocal<ConsumerAndIterator>() {
-      @Override
-      public ConsumerAndIterator initialValue() {
-        return createConsumerAndIter();
-      }
-    };
+
+  private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new
+          ThreadLocal<ConsumerAndRecords>() {
+            @Override
+            public ConsumerAndRecords initialValue() {
+              return createConsumerAndRecords();
+            }
+          };
 
   @Override
   public void start() {
-    try {
-      LOGGER.info("Starting Kafka Channel: " + getName());
-      producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf));
+      logger.info("Starting Kafka Channel: {}", getName());
+      producer = new KafkaProducer<String, byte[]>(producerProps);
       // We always have just one topic being read by one thread
-      LOGGER.info("Topic = " + topic.get());
-      topicCountMap.put(topic.get(), 1);
+      logger.info("Topic = {}", topic.get());
       counter.start();
       super.start();
-    } catch (Exception e) {
-      LOGGER.error("Could not start producer");
-      throw new FlumeException("Unable to create Kafka Connections. " +
-        "Check whether Kafka Brokers are up and that the " +
-        "Flume agent can connect to it.", e);
-    }
   }
 
   @Override
   public void stop() {
-    for (ConsumerAndIterator c : consumers) {
+    for (ConsumerAndRecords c : consumers) {
       try {
-        decommissionConsumerAndIterator(c);
+        decommissionConsumerAndRecords(c);
       } catch (Exception ex) {
-        LOGGER.warn("Error while shutting down consumer.", ex);
+        logger.warn("Error while shutting down consumer.", ex);
       }
     }
     producer.close();
     counter.stop();
     super.stop();
-    LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(),
-        counter);
+    logger.info("Kafka channel {} stopped. Metrics: {}", getName(),
+            counter);
   }
 
   @Override
@@ -129,98 +141,147 @@ public class KafkaChannel extends BasicChannelSemantics {
     return new KafkaTransaction();
   }
 
-  private synchronized ConsumerAndIterator createConsumerAndIter() {
-    try {
-      ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf);
-      ConsumerConnector consumer =
-        Consumer.createJavaConsumerConnector(consumerConfig);
-      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
-        consumer.createMessageStreams(topicCountMap);
-      final List<KafkaStream<byte[], byte[]>> streamList = consumerMap
-        .get(topic.get());
-      KafkaStream<byte[], byte[]> stream = streamList.remove(0);
-      ConsumerAndIterator ret =
-        new ConsumerAndIterator(consumer, stream.iterator(), channelUUID);
-      consumers.add(ret);
-      LOGGER.info("Created new consumer to connect to Kafka");
-      return ret;
-    } catch (Exception e) {
-      throw new FlumeException("Unable to connect to Kafka", e);
-    }
-  }
-
-  Properties getKafkaConf() {
-    return kafkaConf;
-  }
-
   @Override
   public void configure(Context ctx) {
-    String topicStr = ctx.getString(TOPIC);
+
+    //Can remove in the next release
+    translateOldProps(ctx);
+
+    String topicStr = ctx.getString(TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
-      LOGGER
-        .info("Topic was not specified. Using " + topicStr + " as the topic.");
+      logger.info("Topic was not specified. Using {} as the topic.", topicStr);
     }
     topic.set(topicStr);
-    String groupId = ctx.getString(GROUP_ID_FLUME);
-    if (groupId == null || groupId.isEmpty()) {
-      groupId = DEFAULT_GROUP_ID;
-      LOGGER.info(
-        "Group ID was not specified. Using " + groupId + " as the group id.");
+    String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
+    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
-    String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
-    if (brokerList == null || brokerList.isEmpty()) {
-      throw new ConfigurationException("Broker List must be specified");
+
+    setProducerProps(ctx, bootStrapServers);
+    setConsumerProps(ctx, bootStrapServers);
+
+    parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
+    pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
+
+    if (counter == null) {
+      counter = new KafkaChannelCounter(getName());
     }
-    String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
-    if (zkConnect == null || zkConnect.isEmpty()) {
-      throw new ConfigurationException(
-        "Zookeeper Connection must be specified");
+  }
+
+  // We can remove this once the properties are officially deprecated
+  private void translateOldProps(Context ctx) {
+
+    if (!(ctx.containsKey(TOPIC_CONFIG))) {
+      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
+      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
     }
-    kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
-    kafkaConf.put(GROUP_ID, groupId);
-    kafkaConf.put(BROKER_LIST_KEY, brokerList);
-    kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
-    kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
-    if(kafkaConf.get(CONSUMER_TIMEOUT) == null) {
-      kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT);
+
+    //Broker List
+    // If there is no value we need to check and set the old param and log a warning message
+    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
+      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+      if (brokerList == null || brokerList.isEmpty()) {
+        throw new ConfigurationException("Bootstrap Servers must be specified");
+      } else {
+        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+      }
     }
-    kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
-    LOGGER.info(kafkaConf.toString());
-    parseAsFlumeEvent =
-      ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
-
-    boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET,
-      DEFAULT_READ_SMALLEST_OFFSET);
-    // If the data is to be parsed as Flume events, we always read the smallest.
-    // Else, we read the configuration, which by default reads the largest.
-    if (parseAsFlumeEvent || readSmallest) {
-      // readSmallest is eval-ed only if parseAsFlumeEvent is false.
-      // The default is largest, so we don't need to set it explicitly.
-      kafkaConf.put("auto.offset.reset", "smallest");
+
+    //GroupId
+    // If there is an old Group Id set, then use that if no groupId is set.
+    if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
+    String oldGroupId = ctx.getString(GROUP_ID_FLUME);
+      if ( oldGroupId != null  && !oldGroupId.isEmpty()) {
+        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
+        logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+      }
     }
 
-    if (counter == null) {
-      counter = new KafkaChannelCounter(getName());
+    if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) {
+      Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET);
+      String auto;
+      if (oldReadSmallest != null) {
+        if (oldReadSmallest) {
+          auto = "earliest";
+        } else {
+          auto = "latest";
+        }
+        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
+        logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+      }
+
+    }
+  }
+
+
+  private void setProducerProps(Context ctx, String bootStrapServers) {
+    producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
+    //Defaults overridden based on config
+    producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    logger.info("Producer properties: " + producerProps.toString());
+  }
+
+  protected Properties getProducerProps() {
+    return producerProps;
+  }
+
+  private void setConsumerProps(Context ctx, String bootStrapServers) {
+    String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    if (groupId == null || groupId.isEmpty()) {
+      groupId = DEFAULT_GROUP_ID;
+      logger.info("Group ID was not specified. Using {} as the group id.", groupId);
     }
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
+    //Defaults overridden based on config
+    consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
+    //These always take precedence over config
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    logger.info(consumerProps.toString());
+  }
 
+  protected Properties getConsumerProps() { return consumerProps; }
+
+
+  private synchronized ConsumerAndRecords createConsumerAndRecords() {
+    try {
+      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
+      ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
+      logger.info("Created new consumer to connect to Kafka");
+      car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag));
+      car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+      consumers.add(car);
+      return car;
+    } catch (Exception e) {
+      throw new FlumeException("Unable to connect to Kafka", e);
+    }
   }
 
-  private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
+  private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
     if (c.failedEvents.isEmpty()) {
-      c.consumer.commitOffsets();
+      c.commitOffsets();
     }
     c.failedEvents.clear();
-    c.consumer.shutdown();
+    c.consumer.close();
   }
 
-  // Force a consumer to be initialized. There are  many duplicates in
-  // tests due to rebalancing - making testing tricky. In production,
-  // this is less of an issue as
-  // rebalancing would happen only on startup.
   @VisibleForTesting
   void registerThread() {
-    consumerAndIter.get();
+    try {
+      consumerAndRecords.get();
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+      e.printStackTrace();
+    }
   }
 
   private enum TransactionType {
@@ -233,54 +294,41 @@ public class KafkaChannel extends BasicChannelSemantics {
   private class KafkaTransaction extends BasicTransactionSemantics {
 
     private TransactionType type = TransactionType.NONE;
-    // For Puts
     private Optional<ByteArrayOutputStream> tempOutStream = Optional
-      .absent();
-
-    // For put transactions, serialize the events and batch them and send it.
-    private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent();
+            .absent();
+    // For put transactions, serialize the events and hold them until the commit goes is requested.
+    private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent();
     // For take transactions, deserialize and hold them till commit goes through
     private Optional<LinkedList<Event>> events = Optional.absent();
     private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
-      Optional.absent();
+            Optional.absent();
     private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
-      Optional.absent();
+            Optional.absent();
+    private Optional<LinkedList<Future<RecordMetadata>>> kafkaFutures =
+            Optional.absent();
+    private final String batchUUID = UUID.randomUUID().toString();
 
     // Fine to use null for initial value, Avro will create new ones if this
     // is null
     private BinaryEncoder encoder = null;
     private BinaryDecoder decoder = null;
-    private final String batchUUID = UUID.randomUUID().toString();
     private boolean eventTaken = false;
 
     @Override
+    protected void doBegin() throws InterruptedException {
+      rebalanceFlag.set(false);
+    }
+
+    @Override
     protected void doPut(Event event) throws InterruptedException {
       type = TransactionType.PUT;
-      if (!serializedEvents.isPresent()) {
-        serializedEvents = Optional.of(new LinkedList<byte[]>());
+      if (!producerRecords.isPresent()) {
+        producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
       }
-
+      String key = event.getHeaders().get(KEY_HEADER);
       try {
-        if (parseAsFlumeEvent) {
-          if (!tempOutStream.isPresent()) {
-            tempOutStream = Optional.of(new ByteArrayOutputStream());
-          }
-          if (!writer.isPresent()) {
-            writer = Optional.of(new
-              SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
-          }
-          tempOutStream.get().reset();
-          AvroFlumeEvent e = new AvroFlumeEvent(
-            toCharSeqMap(event.getHeaders()),
-            ByteBuffer.wrap(event.getBody()));
-          encoder = EncoderFactory.get()
-            .directBinaryEncoder(tempOutStream.get(), encoder);
-          writer.get().write(e, encoder);
-          // Not really possible to avoid this copy :(
-          serializedEvents.get().add(tempOutStream.get().toByteArray());
-        } else {
-          serializedEvents.get().add(event.getBody());
-        }
+        producerRecords.get().add(new ProducerRecord<String, byte[]>
+                (topic.get(), key, serializeValue(event, parseAsFlumeEvent)));
       } catch (Exception e) {
         throw new ChannelException("Error while serializing event", e);
       }
@@ -291,53 +339,64 @@ public class KafkaChannel extends BasicChannelSemantics {
     protected Event doTake() throws InterruptedException {
       type = TransactionType.TAKE;
       try {
-        if (!(consumerAndIter.get().uuid.equals(channelUUID))) {
-          LOGGER.info("UUID mismatch, creating new consumer");
-          decommissionConsumerAndIterator(consumerAndIter.get());
-          consumerAndIter.remove();
+        if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
+          logger.info("UUID mismatch, creating new consumer");
+          decommissionConsumerAndRecords(consumerAndRecords.get());
+          consumerAndRecords.remove();
         }
       } catch (Exception ex) {
-        LOGGER.warn("Error while shutting down consumer", ex);
+        logger.warn("Error while shutting down consumer", ex);
       }
       if (!events.isPresent()) {
         events = Optional.of(new LinkedList<Event>());
       }
       Event e;
-      if (!consumerAndIter.get().failedEvents.isEmpty()) {
-        e = consumerAndIter.get().failedEvents.removeFirst();
+      // Give the channel a chance to commit if there has been a rebalance
+      if (rebalanceFlag.get()) {
+        logger.debug("Returning null event after Consumer rebalance.");
+        return null;
+      }
+      if (!consumerAndRecords.get().failedEvents.isEmpty()) {
+        e = consumerAndRecords.get().failedEvents.removeFirst();
       } else {
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString());
+        }
+
         try {
-          ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
           long startTime = System.nanoTime();
-          it.hasNext();
-          long endTime = System.nanoTime();
-          counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
-          if (parseAsFlumeEvent) {
-            ByteArrayInputStream in =
-              new ByteArrayInputStream(it.next().message());
-            decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
-            if (!reader.isPresent()) {
-              reader = Optional.of(
-                new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
-            }
-            AvroFlumeEvent event = reader.get().read(null, decoder);
-            e = EventBuilder.withBody(event.getBody().array(),
-              toStringMap(event.getHeaders()));
-          } else {
-            e = EventBuilder.withBody(it.next().message(),
-              Collections.EMPTY_MAP);
+          if (!consumerAndRecords.get().recordIterator.hasNext()) {
+            consumerAndRecords.get().poll();
           }
+          if (consumerAndRecords.get().recordIterator.hasNext()) {
+            ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
+            e = deserializeValue(record.value(), parseAsFlumeEvent);
+            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+            OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
+            consumerAndRecords.get().offsets.put(tp, oam);
+
+            if (logger.isTraceEnabled()) {
+              logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString());
+            }
 
-        } catch (ConsumerTimeoutException ex) {
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Timed out while waiting for data to come from Kafka",
-              ex);
+            //Add the key to the header
+            e.getHeaders().put(KEY_HEADER, record.key());
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
+            }
+
+            long endTime = System.nanoTime();
+            counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
+          } else {
+            return null;
           }
-          return null;
         } catch (Exception ex) {
-          LOGGER.warn("Error while getting events from Kafka", ex);
+          logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " +
+                  "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex);
           throw new ChannelException("Error while getting events from Kafka",
-            ex);
+                  ex);
         }
       }
       eventTaken = true;
@@ -351,32 +410,41 @@ public class KafkaChannel extends BasicChannelSemantics {
         return;
       }
       if (type.equals(TransactionType.PUT)) {
+        if (!kafkaFutures.isPresent()) {
+          kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
+        }
         try {
-          List<KeyedMessage<String, byte[]>> messages = new
-            ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get()
-            .size());
-          for (byte[] event : serializedEvents.get()) {
-            messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
-              batchUUID, event));
-          }
+          long batchSize = producerRecords.get().size();
           long startTime = System.nanoTime();
-          producer.send(messages);
+          int index = 0;
+          for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
+            index++;
+            kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
+          }
+          //prevents linger.ms from being a problem
+          producer.flush();
+
+          for (Future<RecordMetadata> future : kafkaFutures.get()) {
+            future.get();
+          }
           long endTime = System.nanoTime();
-          counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
-          counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
-          serializedEvents.get().clear();
+          counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
+          counter.addToEventPutSuccessCount(batchSize);
+          producerRecords.get().clear();
+          kafkaFutures.get().clear();
         } catch (Exception ex) {
-          LOGGER.warn("Sending events to Kafka failed", ex);
+          logger.warn("Sending events to Kafka failed", ex);
           throw new ChannelException("Commit failed as send to Kafka failed",
-            ex);
+                  ex);
         }
       } else {
-        if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+        if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
           long startTime = System.nanoTime();
-          consumerAndIter.get().consumer.commitOffsets();
+          consumerAndRecords.get().commitOffsets();
           long endTime = System.nanoTime();
-          counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000));
-         }
+          counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
+          consumerAndRecords.get().printCurrentAssignment();
+        }
         counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
         events.get().clear();
       }
@@ -388,37 +456,66 @@ public class KafkaChannel extends BasicChannelSemantics {
         return;
       }
       if (type.equals(TransactionType.PUT)) {
-        serializedEvents.get().clear();
+        producerRecords.get().clear();
+        kafkaFutures.get().clear();
       } else {
         counter.addToRollbackCounter(Long.valueOf(events.get().size()));
-        consumerAndIter.get().failedEvents.addAll(events.get());
+        consumerAndRecords.get().failedEvents.addAll(events.get());
         events.get().clear();
       }
     }
-  }
-
 
-  private class ConsumerAndIterator {
-    final ConsumerConnector consumer;
-    final ConsumerIterator<byte[], byte[]> iterator;
-    final String uuid;
-    final LinkedList<Event> failedEvents = new LinkedList<Event>();
+    private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOException {
+      byte[] bytes;
+      if (parseAsFlumeEvent) {
+        if (!tempOutStream.isPresent()) {
+          tempOutStream = Optional.of(new ByteArrayOutputStream());
+        }
+        if (!writer.isPresent()) {
+          writer = Optional.of(new
+                  SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+        }
+        tempOutStream.get().reset();
+        AvroFlumeEvent e = new AvroFlumeEvent(
+                toCharSeqMap(event.getHeaders()),
+                ByteBuffer.wrap(event.getBody()));
+        encoder = EncoderFactory.get()
+                .directBinaryEncoder(tempOutStream.get(), encoder);
+        writer.get().write(e, encoder);
+        encoder.flush();
+        bytes = tempOutStream.get().toByteArray();
+      } else {
+        bytes = event.getBody();
+      }
+      return bytes;
+    }
 
-    ConsumerAndIterator(ConsumerConnector consumerConnector,
-      ConsumerIterator<byte[], byte[]> iterator, String uuid) {
-      this.consumer = consumerConnector;
-      this.iterator = iterator;
-      this.uuid = uuid;
+    private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException {
+      Event e;
+      if (parseAsFlumeEvent) {
+        ByteArrayInputStream in =
+                new ByteArrayInputStream(value);
+        decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+        if (!reader.isPresent()) {
+          reader = Optional.of(
+                  new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
+        }
+        AvroFlumeEvent event = reader.get().read(null, decoder);
+        e = EventBuilder.withBody(event.getBody().array(),
+                toStringMap(event.getHeaders()));
+      } else {
+        e = EventBuilder.withBody(value, Collections.EMPTY_MAP);
+      }
+      return e;
     }
   }
 
   /**
    * Helper function to convert a map of String to a map of CharSequence.
    */
-  private static Map<CharSequence, CharSequence> toCharSeqMap(
-    Map<String, String> stringMap) {
+  private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
     Map<CharSequence, CharSequence> charSeqMap =
-      new HashMap<CharSequence, CharSequence>();
+            new HashMap<CharSequence, CharSequence>();
     for (Map.Entry<String, String> entry : stringMap.entrySet()) {
       charSeqMap.put(entry.getKey(), entry.getValue());
     }
@@ -428,13 +525,105 @@ public class KafkaChannel extends BasicChannelSemantics {
   /**
    * Helper function to convert a map of CharSequence to a map of String.
    */
-  private static Map<String, String> toStringMap(
-    Map<CharSequence, CharSequence> charSeqMap) {
-    Map<String, String> stringMap =
-      new HashMap<String, String>();
+  private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
+    Map<String, String> stringMap = new HashMap<String, String>();
     for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
       stringMap.put(entry.getKey().toString(), entry.getValue().toString());
     }
     return stringMap;
   }
+
+  /* Object to store our consumer */
+  private class ConsumerAndRecords {
+    final KafkaConsumer<String, byte[]> consumer;
+    final String uuid;
+    final LinkedList<Event> failedEvents = new LinkedList<Event>();
+
+    ConsumerRecords<String, byte[]> records;
+    Iterator<ConsumerRecord<String, byte[]>> recordIterator;
+    Map<TopicPartition, OffsetAndMetadata> offsets;
+
+    ConsumerAndRecords(KafkaConsumer<String, byte[]> consumer, String uuid) {
+      this.consumer = consumer;
+      this.uuid = uuid;
+      this.records = ConsumerRecords.empty();
+      this.recordIterator = records.iterator();
+    }
+
+    void poll() {
+      this.records = consumer.poll(pollTimeout);
+      this.recordIterator = records.iterator();
+      logger.trace("polling");
+    }
+
+    void commitOffsets() {
+      this.consumer.commitSync(offsets);
+    }
+
+    // This will reset the latest assigned partitions to the last committed offsets;
+
+    public void printCurrentAssignment() {
+      StringBuilder sb = new StringBuilder();
+      for (TopicPartition tp : this.consumer.assignment()) {
+        try {
+          sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset())
+                  .append(",").append(this.consumer.committed(tp).metadata()).append("]");
+          if (logger.isDebugEnabled()) {
+            logger.debug(sb.toString());
+          }
+        } catch (NullPointerException npe) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Committed {}", tp);
+          }
+        }
+      }
+    }
+  }
+}
+
+// Throw exception if there is an error
+class ChannelCallback implements Callback {
+  private static final Logger log = LoggerFactory.getLogger(ChannelCallback.class);
+  private int index;
+  private long startTime;
+
+  public ChannelCallback(int index, long startTime) {
+    this.index = index;
+    this.startTime = startTime;
+  }
+
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    if (exception != null) {
+      log.trace("Error sending message to Kafka due to " + exception.getMessage());
+    }
+    if (log.isDebugEnabled()) {
+      long batchElapsedTime = System.currentTimeMillis() - startTime;
+      log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" +
+              metadata.offset() + "-" + batchElapsedTime);
+    }
+  }
+}
+
+class ChannelRebalanceListener implements ConsumerRebalanceListener {
+  private static final Logger log = LoggerFactory.getLogger(ChannelRebalanceListener.class);
+  private AtomicBoolean rebalanceFlag;
+
+  public ChannelRebalanceListener(AtomicBoolean rebalanceFlag) {
+    this.rebalanceFlag = rebalanceFlag;
+  }
+
+  // Set a flag that a rebalance has occurred. Then we can commit the currently written transactions
+  // on the next doTake() pass.
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+      rebalanceFlag.set(true);
+    }
+  }
+
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 9a342ef..faf46b6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -18,27 +18,45 @@
  */
 package org.apache.flume.channel.kafka;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+
 public class KafkaChannelConfiguration {
 
   public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+  public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
+  public static final String DEFAULT_ACKS = "all";
+  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
+  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String DEFAULT_TOPIC = "flume-channel";
+  public static final String DEFAULT_GROUP_ID = "flume";
+  public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
+  public static final long DEFAULT_POLL_TIMEOUT = 500;
+
+  public static final String KEY_HEADER = "key";
+
+  public static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
+
+  public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
+  public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
+
+  /*** Old Configuration Parameters ****/
   public static final String BROKER_LIST_KEY = "metadata.broker.list";
   public static final String REQUIRED_ACKS_KEY = "request.required.acks";
   public static final String BROKER_LIST_FLUME_KEY = "brokerList";
-  public static final String TOPIC = "topic";
-  public static final String GROUP_ID = "group.id";
+  //public static final String TOPIC = "topic";
   public static final String GROUP_ID_FLUME = "groupId";
   public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
   public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
   public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
-  public static final String DEFAULT_GROUP_ID = "flume";
-  public static final String DEFAULT_TOPIC = "flume-channel";
   public static final String TIMEOUT = "timeout";
   public static final String DEFAULT_TIMEOUT = "100";
   public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
 
-  public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
-  public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
-
   public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
   public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 319e779..637428d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -19,13 +19,8 @@
 package org.apache.flume.channel.kafka;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -33,17 +28,26 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.*;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
 public class TestKafkaChannel {
 
+  private final static Logger LOGGER =
+          LoggerFactory.getLogger(TestKafkaChannel.class);
+
   private static TestUtil testUtil = TestUtil.getInstance();
   private String topic = null;
   private final Set<String> usedTopics = new HashSet<String>();
@@ -78,11 +82,52 @@ public class TestKafkaChannel {
     testUtil.tearDown();
   }
 
+  //Make sure the props are picked up correctly.
+  @Test
+  public void testProps() throws Exception {
+    Context context = new Context();
+    context.put("kafka.producer.some-parameter", "1");
+    context.put("kafka.consumer.another-parameter", "1");
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(TOPIC_CONFIG, topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty("some-parameter"), "1");
+    Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
+  }
+
+  @Test
+  public void testOldConfig() throws Exception {
+    Context context = new Context();
+    context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl());
+    context.put(GROUP_ID_FLUME,"flume-something");
+    context.put(READ_SMALLEST_OFFSET,"true");
+    context.put("topic",topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl());
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something");
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+
+  }
+
+
   @Test
   public void testSuccess() throws Exception {
     doTestSuccessRollback(false, false);
   }
 
+
   @Test
   public void testSuccessInterleave() throws Exception {
     doTestSuccessRollback(false, true);
@@ -99,7 +144,7 @@ public class TestKafkaChannel {
   }
 
   private void doTestSuccessRollback(final boolean rollback,
-    final boolean interleave) throws Exception {
+                                     final boolean interleave) throws Exception {
     final KafkaChannel channel = startChannel(true);
     writeAndVerify(rollback, channel, interleave);
     channel.stop();
@@ -122,82 +167,89 @@ public class TestKafkaChannel {
   }
 
   @Test
-  public void testNoParsingAsFlumeAgent() throws Exception {
+  public void testParseAsFlumeEventFalse() throws Exception {
+    doParseAsFlumeEventFalse(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseCheckHeader() throws Exception {
+    doParseAsFlumeEventFalse(true);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSource() throws Exception {
+    doParseAsFlumeEventFalseAsSource(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception {
+    doParseAsFlumeEventFalseAsSource(true);
+  }
+
+  private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
-    Producer<String, byte[]> producer = new Producer<String, byte[]>(
-      new ProducerConfig(channel.getKafkaConf()));
-    List<KeyedMessage<String, byte[]>> original = Lists.newArrayList();
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
+
     for (int i = 0; i < 50; i++) {
-      KeyedMessage<String, byte[]> data = new KeyedMessage<String,
-        byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6),
-        String.valueOf(i).getBytes());
-      original.add(data);
+      ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes());
+      producer.send(data).get();
     }
-    producer.send(original);
     ExecutorCompletionService<Void> submitterSvc = new
-      ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
     List<Event> events = pullEvents(channel, submitterSvc,
-      50, false, false);
+            50, false, false);
     wait(submitterSvc, 5);
-    Set<Integer> finals = Sets.newHashSet();
+    Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.contains(i));
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
       finals.remove(i);
     }
     Assert.assertTrue(finals.isEmpty());
     channel.stop();
   }
 
-  @Test
-  public void testTimeoutConfig() throws Exception {
-    Context context = prepareDefaultContext(true);
-    KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
-      .equals(DEFAULT_TIMEOUT));
-
-    String timeout = "1000";
-    context.put("kafka."+CONSUMER_TIMEOUT, timeout);
-    channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
-            .equals(timeout));
-  }
-
   /**
    * Like the previous test but here we write to the channel like a Flume source would do
    * to verify that the events are written as text and not as an Avro object
    *
    * @throws Exception
    */
-  @Test
-  public void testWritingToNoParsingAsFlumeAgent() throws Exception {
+  public void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
 
     List<String> msgs = new ArrayList<String>();
-    for (int i = 0; i < 50; i++){
+    Map<String, String> headers = new HashMap<String, String>();
+    for (int i = 0; i < 50; i++) {
       msgs.add(String.valueOf(i));
     }
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for (int i = 0; i < msgs.size(); i++){
-      channel.put(EventBuilder.withBody(msgs.get(i).getBytes()));
+    for (int i = 0; i < msgs.size(); i++) {
+      headers.put(KEY_HEADER, String.valueOf(i) + "-header");
+      channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
     }
     tx.commit();
     ExecutorCompletionService<Void> submitterSvc = new
             ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
     List<Event> events = pullEvents(channel, submitterSvc,
-      50, false, false);
+            50, false, false);
     wait(submitterSvc, 5);
-    Set<Integer> finals = Sets.newHashSet();
+    Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.contains(i));
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
       finals.remove(i);
     }
     Assert.assertTrue(finals.isEmpty());
@@ -216,12 +268,12 @@ public class TestKafkaChannel {
    * @throws Exception
    */
   private void doTestStopAndStart(boolean rollback,
-    boolean retryAfterRollback) throws Exception {
+                                  boolean retryAfterRollback) throws Exception {
     final KafkaChannel channel = startChannel(true);
     ExecutorService underlying = Executors
-      .newCachedThreadPool();
+            .newCachedThreadPool();
     ExecutorCompletionService<Void> submitterSvc =
-      new ExecutorCompletionService<Void>(underlying);
+            new ExecutorCompletionService<Void>(underlying);
     final List<List<Event>> events = createBaseList();
     putEvents(channel, events, submitterSvc);
     int completed = 0;
@@ -233,14 +285,14 @@ public class TestKafkaChannel {
       total = 40;
     }
     final List<Event> eventsPulled =
-      pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
+            pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
     wait(submitterSvc, 5);
     channel2.stop();
     if (!retryAfterRollback && rollback) {
       final KafkaChannel channel3 = startChannel(true);
       int expectedRemaining = 50 - eventsPulled.size();
       final List<Event> eventsPulled2 =
-        pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
+              pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
       wait(submitterSvc, 5);
       Assert.assertEquals(expectedRemaining, eventsPulled2.size());
       eventsPulled.addAll(eventsPulled2);
@@ -259,18 +311,18 @@ public class TestKafkaChannel {
   }
 
   private void writeAndVerify(final boolean testRollbacks,
-    final KafkaChannel channel) throws Exception {
+                              final KafkaChannel channel) throws Exception {
     writeAndVerify(testRollbacks, channel, false);
   }
 
   private void writeAndVerify(final boolean testRollbacks,
-    final KafkaChannel channel, final boolean interleave) throws Exception {
+                              final KafkaChannel channel, final boolean interleave) throws Exception {
 
     final List<List<Event>> events = createBaseList();
 
     ExecutorCompletionService<Void> submitterSvc =
-      new ExecutorCompletionService<Void>(Executors
-        .newCachedThreadPool());
+            new ExecutorCompletionService<Void>(Executors
+                    .newCachedThreadPool());
 
     putEvents(channel, events, submitterSvc);
 
@@ -279,11 +331,11 @@ public class TestKafkaChannel {
     }
 
     ExecutorCompletionService<Void> submitterSvc2 =
-      new ExecutorCompletionService<Void>(Executors
-        .newCachedThreadPool());
+            new ExecutorCompletionService<Void>(Executors
+                    .newCachedThreadPool());
 
     final List<Event> eventsPulled =
-      pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+            pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
 
     if (!interleave) {
       wait(submitterSvc, 5);
@@ -301,7 +353,7 @@ public class TestKafkaChannel {
       for (int j = 0; j < 10; j++) {
         Map<String, String> hdrs = new HashMap<String, String>();
         String v = (String.valueOf(i) + " - " + String
-          .valueOf(j));
+                .valueOf(j));
         hdrs.put("header", v);
         eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
       }
@@ -310,7 +362,7 @@ public class TestKafkaChannel {
   }
 
   private void putEvents(final KafkaChannel channel, final List<List<Event>>
-    events, ExecutorCompletionService<Void> submitterSvc) {
+          events, ExecutorCompletionService<Void> submitterSvc) {
     for (int i = 0; i < 5; i++) {
       final int index = i;
       submitterSvc.submit(new Callable<Void>() {
@@ -334,10 +386,10 @@ public class TestKafkaChannel {
   }
 
   private List<Event> pullEvents(final KafkaChannel channel,
-    ExecutorCompletionService<Void> submitterSvc, final int total,
-    final boolean testRollbacks, final boolean retryAfterRollback) {
+                                 ExecutorCompletionService<Void> submitterSvc, final int total,
+                                 final boolean testRollbacks, final boolean retryAfterRollback) {
     final List<Event> eventsPulled = Collections.synchronizedList(new
-      ArrayList<Event>(50));
+            ArrayList<Event>(50));
     final CyclicBarrier barrier = new CyclicBarrier(5);
     final AtomicInteger counter = new AtomicInteger(0);
     final AtomicInteger rolledBackCount = new AtomicInteger(0);
@@ -366,9 +418,9 @@ public class TestKafkaChannel {
                 eventsLocal.add(e);
               } else {
                 if (testRollbacks &&
-                  index == 4 &&
-                  (!rolledBack.get()) &&
-                  startedGettingEvents.get()) {
+                        index == 4 &&
+                        (!rolledBack.get()) &&
+                        startedGettingEvents.get()) {
                   tx.rollback();
                   tx.close();
                   tx = null;
@@ -407,7 +459,7 @@ public class TestKafkaChannel {
   }
 
   private void wait(ExecutorCompletionService<Void> submitterSvc, int max)
-    throws Exception {
+          throws Exception {
     int completed = 0;
     while (completed < max) {
       submitterSvc.take();
@@ -420,8 +472,7 @@ public class TestKafkaChannel {
     Assert.assertEquals(50, eventsPulled.size());
     Set<String> eventStrings = new HashSet<String>();
     for (Event e : eventsPulled) {
-      Assert
-        .assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
+      Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
       eventStrings.add(e.getHeaders().get("header"));
     }
     for (int i = 0; i < 5; i++) {
@@ -437,14 +488,10 @@ public class TestKafkaChannel {
   private Context prepareDefaultContext(boolean parseAsFlume) {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
-    context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY,
-      testUtil.getKafkaServerUrl());
-    context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY,
-      testUtil.getZkUrl());
-    context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT,
-      String.valueOf(parseAsFlume));
-    context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true");
-    context.put(KafkaChannelConfiguration.TOPIC, topic);
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
+    context.put(TOPIC_CONFIG, topic);
+
     return context;
   }
 
@@ -452,22 +499,18 @@ public class TestKafkaChannel {
     int numPartitions = 5;
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
-      sessionTimeoutMs, connectionTimeoutMs,
-      ZKStringSerializer$.MODULE$);
+    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
 
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkClient, topicName, numPartitions,
-      replicationFactor, topicConfig);
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
+            replicationFactor, topicConfig);
   }
 
   public static void deleteTopic(String topicName) {
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
-      sessionTimeoutMs, connectionTimeoutMs,
-      ZKStringSerializer$.MODULE$);
-    AdminUtils.deleteTopic(zkClient, topicName);
+    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    AdminUtils.deleteTopic(zkUtils, topicName);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index c10c89d..216bfd8 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -38,7 +38,7 @@ port=9092
 #advertised.port=<port accessible by clients>
 
 # The number of threads handling network requests
-num.network.threads=2
+num.network.threads=4
 
 # The number of threads doing disk I/O
 num.io.threads=8

http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 15f27c3..5149ab5 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2679,36 +2679,60 @@ The Kafka channel can be used for multiple scenarios:
 * With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
 * With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr
 
+
+This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.
+
+The configuration parameters are organized as such:
+1) Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
+2) Configuration values related to Kafka or how the Channel operates are prefixed with "kafka.",  (this are analgous to CommonClient Configs)eg: a1.channels.k1.kafka.topica1.channels.k1.kafka.bootstrap.serversThis is not dissimilar to how the hdfs sink operates
+3) Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
+4) Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks
+
+This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message
+is logged on startup when they are present in the configuration file.
+
 Required properties are in **bold**.
 
-======================  ==========================  ===============================================================================================================
-Property Name           Default                           Description
-======================  ==========================  ===============================================================================================================
-**type**                --                          The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
-**brokerList**          --                          List of brokers in the Kafka cluster used by the channel
-                                                    This can be a partial list of brokers, but we recommend at least two for HA.
-                                                    The format is comma separated list of hostname:port
-**zookeeperConnect**    --                          URI of ZooKeeper used by Kafka cluster
-                                                    The format is comma separated list of hostname:port. If chroot is used, it is added once at the end.
-                                                    For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka
-topic                   flume-channel               Kafka topic which the channel will use
-groupId                 flume                       Consumer group ID the channel uses to register with Kafka.
-                                                    Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
-                                                    Note that having non-channel consumers with the same ID can lead to data loss.
-parseAsFlumeEvent       true                        Expecting Avro datums with FlumeEvent schema in the channel.
-                                                    This should be true if Flume source is writing to the channel
-                                                    And false if other producers are writing into the topic that the channel is using
-                                                    Flume source messages to Kafka can be parsed outside of Flume by using
-                                                    org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
-readSmallestOffset      false                       When set to true, the channel will read all data in the topic, starting from the oldest event
-                                                    when false, it will read only events written after the channel started
-                                                    When "parseAsFlumeEvent" is true, this will be false. Flume source will start prior to the sinks and this
-                                                    guarantees that events sent by source before sinks start will not be lost.
-Other Kafka Properties  --                          These properties are used to configure the Kafka Producer and Consumer used by the channel.
-                                                    Any property supported by Kafka can be used.
-                                                    The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                                    For example: kafka.producer.type
-======================  ==========================  ===============================================================================================================
+================================  ==========================  ===============================================================================================================
+Property Name                     Default                     Description
+================================  ==========================  ===============================================================================================================
+**type**                          --                          The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
+**kafka.bootstrap.servers**       --                          List of brokers in the Kafka cluster used by the channel
+                                                              This can be a partial list of brokers, but we recommend at least two for HA.
+                                                              The format is comma separated list of hostname:port
+kafka.topic                       flume-channel               Kafka topic which the channel will use
+kafka.consumer.group.id           flume                       Consumer group ID the channel uses to register with Kafka.
+                                                              Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
+                                                              Note that having non-channel consumers with the same ID can lead to data loss.
+
+parseAsFlumeEvent                 true                        Expecting Avro datums with FlumeEvent schema in the channel.
+                                                              This should be true if Flume source is writing to the channel and false if other producers are
+                                                              writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
+                                                              org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
+pollTimeout                       500                         The amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
+                                                              https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
+kafka.consumer.auto.offset.reset  latest                      What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
+                                                              (e.g. because that data has been deleted):
+                                                              earliest: automatically reset the offset to the earliest offset
+                                                              latest: automatically reset the offset to the latest offset
+                                                              none: throw exception to the consumer if no previous offset is found for the consumer\'s group
+                                                              anything else: throw exception to the consumer.
+================================  ==========================  ===============================================================================================================
+
+Deprecated Properties
+
+================================  ==========================  ===============================================================================================================
+Property Name                     Default                     Description
+================================  ==========================  ===============================================================================================================
+brokerList                        --                          List of brokers in the Kafka cluster used by the channel
+                                                              This can be a partial list of brokers, but we recommend at least two for HA.
+                                                              The format is comma separated list of hostname:port
+topic                             flume-channel               Use kafka.topic
+groupId                           flume                       Use kafka.consumer.group.id
+readSmallestOffset                false                       Use kafka.consumer.auto.offset.reset
+
+================================  ==========================  ===============================================================================================================
 
 .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up
 
@@ -2716,12 +2740,12 @@ Example for agent named a1:
 
 .. code-block:: properties
 
-    a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
     a1.channels.channel1.capacity = 10000
     a1.channels.channel1.transactionCapacity = 1000
-    a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
-    a1.channels.channel1.topic=channel1
-    a1.channels.channel1.zookeeperConnect=kafka-1:2181
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
 
 File Channel
 ~~~~~~~~~~~~