You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/29 19:44:36 UTC

[2/3] flume git commit: FLUME-2822: Flume-Kafka-Sink with new Producer

FLUME-2822: Flume-Kafka-Sink with new Producer

(Jeff Holoman via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7f588e6a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7f588e6a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7f588e6a

Branch: refs/heads/trunk
Commit: 7f588e6a158f5d108e39f50a92f8d1d108b12c24
Parents: f8abaf7
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:43:40 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:43:40 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  49 ++--
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   7 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 224 +++++++++++++++----
 .../flume/sink/kafka/KafkaSinkConstants.java    |  38 ++--
 .../apache/flume/sink/kafka/KafkaSinkUtil.java  | 103 ---------
 .../flume/sink/kafka/KafkaSinkUtilTest.java     |  55 -----
 .../apache/flume/sink/kafka/TestKafkaSink.java  |  74 +++++-
 7 files changed, 310 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 341ae42..15f27c3 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2493,7 +2493,9 @@ Kafka Sink
 This is a Flume Sink implementation that can publish data to a
 `Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of releases.
+through various Flume sources. This currently supports Kafka 0.9.x series of releases.
+
+This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
 
 Required properties are marked in bold font.
 
@@ -2502,20 +2504,21 @@ Required properties are marked in bold font.
 Property Name                    Default              Description
 ===============================  ===================  =============================================================================================
 **type**                         --                   Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
-**brokerList**                   --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
+**kafka.bootstrap.servers**      --                   List of brokers Kafka-Sink will connect to, to get the list of topic partitions
                                                       This can be a partial list of brokers, but we recommend at least two for HA.
                                                       The format is comma separated list of hostname:port
-topic                            default-flume-topic  The topic in Kafka to which the messages will be published. If this parameter is configured,
+kafka.topic                      default-flume-topic  The topic in Kafka to which the messages will be published. If this parameter is configured,
                                                       messages will be published to this topic.
                                                       If the event header contains a "topic" field, the event will be published to that topic
                                                       overriding the topic configured here.
-batchSize                        100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
-requiredAcks                     1                    How many replicas must acknowledge a message before its considered successfully written.
+flumeBatchSize                   100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
+kafka.producer.acks              1                    How many replicas must acknowledge a message before its considered successfully written.
                                                       Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
                                                       Set this to -1 to avoid data loss in some cases of leader failure.
 Other Kafka Producer Properties  --                   These properties are used to configure the Kafka Producer. Any producer property supported
-                                                      by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                                      For example: kafka.producer.type
+                                                      by Kafka can be used. The only requirement is to prepend the property name with the prefix
+                                                      ``kafka.producer``.
+                                                      For example: kafka.producer.linger.ms
 ===============================  ===================  =============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka.
@@ -2523,22 +2526,38 @@ Other Kafka Producer Properties  --                   These properties are used
             If ``key`` exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key
             will be sent to the same partition. If the key is null, events will be sent to random partitions.
 
+The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
+
+Deprecated Properties
+
+===============================  ===================  =============================================================================================
+Property Name                    Default              Description
+===============================  ===================  =============================================================================================
+brokerList                       --                   Use kafka.bootstrap.servers
+topic                            default-flume-topic  Use kafka.topic
+batchSize                        100                  Use kafka.flumeBatchSize
+requiredAcks                     1                    Use kafka.producer.acks
+
+===============================  ===================  =============================================================================================
+
 An example configuration of a Kafka sink is given below. Properties starting
-with the prefix ``kafka`` (the last 3 properties) are used when instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix ``kafka.producer`` the Kafka producer. The properties that are passed when creating the Kafka
 producer are not limited to the properties given in this example.
-Also it's possible include your custom properties here and access them inside
+Also it is possible to include your custom properties here and access them inside
 the preprocessor through the Flume Context object passed in as a method
 argument.
 
 .. code-block:: properties
 
-    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
-    a1.sinks.k1.topic = mytopic
-    a1.sinks.k1.brokerList = localhost:9092
-    a1.sinks.k1.requiredAcks = 1
-    a1.sinks.k1.batchSize = 20
     a1.sinks.k1.channel = c1
+    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
+    a1.sinks.k1.kafka.topic = mytopic
+    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
+    a1.sinks.k1.kafka.flumeBatchSize = 20
+    a1.sinks.k1.kafka.producer.acks = 1
+    a1.sinks.k1.kafka.producer.linger.ms = 1
+    a1.sinks.ki.kafka.producer.compression.type = snappy
 
 Custom Sink
 ~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 8475aa1..195c921 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -72,7 +72,14 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
+      <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 38b854b..2e2140e 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -19,20 +19,46 @@
 package org.apache.flume.sink.kafka;
 
 import com.google.common.base.Throwables;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Properties;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_HEADER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
+
 
 /**
  * A Flume Sink that can publish messages to Kafka.
@@ -67,16 +93,25 @@ import java.util.ArrayList;
 public class KafkaSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
-  public static final String KEY_HDR = "key";
-  public static final String TOPIC_HDR = "topic";
-  private Properties kafkaProps;
-  private Producer<String, byte[]> producer;
+
+  private final Properties kafkaProps = new Properties();
+  private KafkaProducer<String, byte[]> producer;
+
   private String topic;
   private int batchSize;
-  private List<KeyedMessage<String, byte[]>> messageList;
+  private List<Future<RecordMetadata>> kafkaFutures;
   private KafkaSinkCounter counter;
 
 
+  //For testing
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
     Status result = Status.READY;
@@ -92,7 +127,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
       transaction = channel.getTransaction();
       transaction.begin();
 
-      messageList.clear();
+      kafkaFutures.clear();
+      long batchStartTime = System.nanoTime();
       for (; processedEvents < batchSize; processedEvents += 1) {
         event = channel.take();
 
@@ -110,11 +146,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
         byte[] eventBody = event.getBody();
         Map<String, String> headers = event.getHeaders();
 
-        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
+        eventTopic = headers.get(TOPIC_HEADER);
+        if (eventTopic == null) {
           eventTopic = topic;
         }
-
-        eventKey = headers.get(KEY_HDR);
+        eventKey = headers.get(KEY_HEADER);
 
         if (logger.isDebugEnabled()) {
           logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
@@ -123,19 +159,22 @@ public class KafkaSink extends AbstractSink implements Configurable {
         }
 
         // create a message and add to buffer
-        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
-          (eventTopic, eventKey, eventBody);
-        messageList.add(data);
-
+        long startTime = System.currentTimeMillis();
+        kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, eventBody),
+                                         new SinkCallback(startTime)));
       }
 
+      //Prevent linger.ms from holding the batch
+      producer.flush();
+
       // publish batch and commit.
       if (processedEvents > 0) {
-        long startTime = System.nanoTime();
-        producer.send(messageList);
+          for (Future<RecordMetadata> future : kafkaFutures) {
+            future.get();
+          }
         long endTime = System.nanoTime();
-        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
-        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
+        counter.addToKafkaEventSendTimer((endTime-batchStartTime)/(1000*1000));
+        counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
       }
 
       transaction.commit();
@@ -146,6 +185,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
       result = Status.BACKOFF;
       if (transaction != null) {
         try {
+          kafkaFutures.clear();
           transaction.rollback();
           counter.incrementRollbackCount();
         } catch (Exception e) {
@@ -166,8 +206,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
   @Override
   public synchronized void start() {
     // instantiate the producer
-    ProducerConfig config = new ProducerConfig(kafkaProps);
-    producer = new Producer<String, byte[]>(config);
+    producer = new KafkaProducer<String,byte[]>(kafkaProps);
     counter.start();
     super.start();
   }
@@ -197,31 +236,132 @@ public class KafkaSink extends AbstractSink implements Configurable {
   @Override
   public void configure(Context context) {
 
-    batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
-      KafkaSinkConstants.DEFAULT_BATCH_SIZE);
-    messageList =
-      new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
-    logger.debug("Using batch size: {}", batchSize);
-
-    topic = context.getString(KafkaSinkConstants.TOPIC,
-      KafkaSinkConstants.DEFAULT_TOPIC);
-    if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
-      logger.warn("The Property 'topic' is not set. " +
-        "Using the default topic name: " +
-        KafkaSinkConstants.DEFAULT_TOPIC);
-    } else {
-      logger.info("Using the static topic: " + topic +
-        " this may be over-ridden by event headers");
+    translateOldProps(context);
+
+    String topicStr = context.getString(TOPIC_CONFIG);
+    if (topicStr == null || topicStr.isEmpty()) {
+      topicStr = DEFAULT_TOPIC;
+      logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
+    }
+    else {
+      logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
+    }
+
+    topic = topicStr;
+
+    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Using batch size: {}", batchSize);
+    }
+
+    kafkaFutures = new LinkedList<Future<RecordMetadata>>();
+
+    String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG);
+    if (bootStrapServers == null || bootStrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
-    kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
+    setProducerProps(context, bootStrapServers);
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Kafka producer properties: " + kafkaProps);
+      logger.debug("Kafka producer properties: {}" , kafkaProps);
     }
 
     if (counter == null) {
       counter = new KafkaSinkCounter(getName());
     }
   }
+
+  private void translateOldProps(Context ctx) {
+
+    if (!(ctx.containsKey(TOPIC_CONFIG))) {
+      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
+      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
+    }
+
+    //Broker List
+    // If there is no value we need to check and set the old param and log a warning message
+    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
+      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+      if (brokerList == null || brokerList.isEmpty()) {
+        throw new ConfigurationException("Bootstrap Servers must be specified");
+      } else {
+        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+      }
+    }
+
+    //batch Size
+    if (!(ctx.containsKey(BATCH_SIZE))) {
+      String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
+      if ( oldBatchSize != null  && !oldBatchSize.isEmpty())  {
+        ctx.put(BATCH_SIZE, oldBatchSize);
+        logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
+      }
+    }
+
+    // Acks
+    if (!(ctx.containsKey(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG))) {
+      String requiredKey = ctx.getString(
+              KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
+      if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
+        ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
+        logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
+                KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);
+      }
+    }
+
+    if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
+      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
+              "a different interface for serializers. Please use the parameter {}",
+              KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+    }
+
+    if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
+      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
+                      "a different interface for serializers. Please use the parameter {}",
+              MESSAGE_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+    }
+
+
+
+
+
+  }
+  private void setProducerProps(Context context, String bootStrapServers) {
+    kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+    //Defaults overridden based on config
+    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
+    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
+    kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
+    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    logger.info("Producer properties: {}" , kafkaProps.toString());
+  }
+
+  protected Properties getKafkaProps() {
+    return kafkaProps;
+  }
+}
+
+class SinkCallback implements Callback {
+  private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
+  private long startTime;
+
+  public SinkCallback(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    if (exception != null) {
+      logger.debug("Error sending message to Kafka {} ", exception.getMessage());
+    }
+
+    if (logger.isDebugEnabled()) {
+      long eventElapsedTime = System.currentTimeMillis() - startTime;
+      logger.debug("Acked message partition:{} ofset:{}",  metadata.partition(), metadata.offset());
+      logger.debug("Elapsed time for send: {}", eventElapsedTime);
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 3ee12de..c84dec0 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -18,30 +18,42 @@
 
 package org.apache.flume.sink.kafka;
 
-import kafka.serializer.StringDecoder;
+import org.apache.kafka.clients.CommonClientConfigs;
 
 public class KafkaSinkConstants {
 
-  public static final String PROPERTY_PREFIX = "kafka.";
+  public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
 
   /* Properties */
 
-  public static final String TOPIC = "topic";
-  public static final String BATCH_SIZE = "batchSize";
+  public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
+  public static final String BATCH_SIZE = "flumeBatchSize";
+  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+  public static final String KEY_HEADER = "key";
+  public static final String TOPIC_HEADER = "topic";
+
+  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+  public static final int DEFAULT_BATCH_SIZE = 100;
+  public static final String DEFAULT_TOPIC = "default-flume-topic";
+  public static final String DEFAULT_ACKS = "1";
+
+
+
+  /* Old Properties */
+
+   /* Properties */
+
+  public static final String OLD_BATCH_SIZE = "batchSize";
   public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
   public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
-  public static final String BROKER_LIST_KEY = "metadata.broker.list";
-  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
   public static final String BROKER_LIST_FLUME_KEY = "brokerList";
   public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
 
 
 
-  public static final int DEFAULT_BATCH_SIZE = 100;
-  public static final String DEFAULT_TOPIC = "default-flume-topic";
-  public static final String DEFAULT_MESSAGE_SERIALIZER =
-          "kafka.serializer.DefaultEncoder";
-  public static final String DEFAULT_KEY_SERIALIZER =
-          "kafka.serializer.StringEncoder";
-  public static final String DEFAULT_REQUIRED_ACKS = "1";
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
deleted file mode 100644
index 66bde85..0000000
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kafka;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.util.PropertiesTrait;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaSinkUtil {
-
-  private static final Logger log =
-          LoggerFactory.getLogger(KafkaSinkUtil.class);
-
-  public static Properties getKafkaProperties(Context context) {
-    log.info("context={}",context.toString());
-    Properties props =  generateDefaultKafkaProps();
-    setKafkaProps(context, props);
-    addDocumentedKafkaProps(context, props);
-    return props;
-  }
-
-  /**
-   * Some of the producer properties are especially important
-   * We documented them and gave them a camel-case name to match Flume config
-   * If user set these, we will override any existing parameters with these
-   * settings.
-   * Knowledge of which properties are documented is maintained here for now.
-   * If this will become a maintenance issue we'll set a proper data structure.
-   */
-  private static void addDocumentedKafkaProps(Context context,
-                                              Properties kafkaProps)
-          throws ConfigurationException {
-    String brokerList = context.getString(KafkaSinkConstants
-            .BROKER_LIST_FLUME_KEY);
-    if (brokerList == null) {
-      throw new ConfigurationException("brokerList must contain at least " +
-              "one Kafka broker");
-    }
-    kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
-
-    String requiredKey = context.getString(
-            KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
-
-    if (requiredKey != null ) {
-      kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
-    }
-  }
-
-
-  /**
-   * Generate producer properties object with some defaults
-   * @return
-   */
-  private static Properties generateDefaultKafkaProps() {
-    Properties props = new Properties();
-    props.put(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY,
-            KafkaSinkConstants.DEFAULT_MESSAGE_SERIALIZER);
-    props.put(KafkaSinkConstants.KEY_SERIALIZER_KEY,
-            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
-    props.put(KafkaSinkConstants.REQUIRED_ACKS_KEY,
-            KafkaSinkConstants.DEFAULT_REQUIRED_ACKS);
-    return props;
-  }
-
-
-  /**
-   * Add all configuration parameters starting with "kafka"
-   * to producer properties
-   */
-  private static void setKafkaProps(Context context, Properties kafkaProps) {
-
-    Map<String,String> kafkaProperties =
-            context.getSubProperties(KafkaSinkConstants.PROPERTY_PREFIX);
-
-    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
-
-      kafkaProps.put(prop.getKey(), prop.getValue());
-      if (log.isDebugEnabled()) {
-        log.debug("Reading a Kafka Producer Property: key: "
-                + prop.getKey() + ", value: " + prop.getValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
deleted file mode 100644
index 84d213c..0000000
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kafka;
-
-import junit.framework.TestCase;
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurables;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaSinkUtilTest extends TestCase {
-
-  @Test
-  public void testGetKafkaProperties() {
-    Context context = new Context();
-    context.put("kafka.serializer.class", "override.default.serializer");
-    context.put("kafka.fake.property", "kafka.property.value");
-    context.put("kafka.metadata.broker.list","bad-broker-list");
-    context.put("brokerList","real-broker-list");
-    Properties kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
-
-    //check that we have defaults set
-    assertEquals(
-            kafkaProps.getProperty(KafkaSinkConstants.KEY_SERIALIZER_KEY),
-            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
-    //check that kafka properties override the default and get correct name
-    assertEquals(
-            kafkaProps.getProperty(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY),
-            "override.default.serializer");
-    //check that any kafka property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"),
-            "kafka.property.value");
-    //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("metadata.broker.list")
-            ,"real-broker-list");
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/7f588e6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 72117b1..1852099 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -19,11 +19,18 @@
 package org.apache.flume.sink.kafka;
 
 import kafka.message.MessageAndMetadata;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -33,12 +40,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.*;
+
 /**
  * Unit tests for Kafka Sink
  */
@@ -50,7 +59,7 @@ public class TestKafkaSink {
   public static void setup() {
     testUtil.prepare();
     List<String> topics = new ArrayList<String>(3);
-    topics.add(KafkaSinkConstants.DEFAULT_TOPIC);
+    topics.add(DEFAULT_TOPIC);
     topics.add(TestConstants.STATIC_TOPIC);
     topics.add(TestConstants.CUSTOM_TOPIC);
     testUtil.initTopicList(topics);
@@ -62,6 +71,50 @@ public class TestKafkaSink {
   }
 
   @Test
+  public void testKafkaProperties() {
+
+    KafkaSink kafkaSink = new KafkaSink();
+    Context context = new Context();
+    context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
+    context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "override.default.serializer");
+    context.put("kafka.producer.fake.property", "kafka.property.value");
+    context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
+    context.put("brokerList","real-broker-list");
+    Configurables.configure(kafkaSink,context);
+
+    Properties kafkaProps = kafkaSink.getKafkaProps();
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), DEFAULT_KEY_SERIALIZER);
+    //check that kafka properties override the default and get correct name
+    assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), "override.default.serializer");
+    //check that any kafka-producer property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"), "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("bootstrap.servers") ,"localhost:9092,localhost:9092");
+  }
+
+  @Test
+  public void testOldProperties() {
+    KafkaSink kafkaSink = new KafkaSink();
+    Context context = new Context();
+    context.put("topic","test-topic");
+    context.put(OLD_BATCH_SIZE, "300");
+    context.put(BROKER_LIST_FLUME_KEY,"localhost:9092,localhost:9092");
+    context.put(REQUIRED_ACKS_FLUME_KEY, "all");
+    Configurables.configure(kafkaSink,context);
+
+    Properties kafkaProps = kafkaSink.getKafkaProps();
+
+    assertEquals(kafkaSink.getTopic(), "test-topic");
+    assertEquals(kafkaSink.getBatchSize(),300);
+    assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),"localhost:9092,localhost:9092");
+    assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
+
+  }
+
+  @Test
   public void testDefaultTopic() {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
@@ -89,7 +142,7 @@ public class TestKafkaSink {
     }
 
     String fetchedMsg = new String((byte[])
-      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)
+      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)
         .message());
     assertEquals(msg, fetchedMsg);
   }
@@ -98,7 +151,7 @@ public class TestKafkaSink {
   public void testStaticTopic() {
     Context context = prepareDefaultContext();
     // add the static topic
-    context.put(KafkaSinkConstants.TOPIC, TestConstants.STATIC_TOPIC);
+    context.put(TOPIC_CONFIG, TestConstants.STATIC_TOPIC);
     String msg = "static-topic-test";
 
     try {
@@ -110,8 +163,7 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(
-      TestConstants.STATIC_TOPIC).message());
+    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(TestConstants.STATIC_TOPIC).message());
     assertEquals(msg, fetchedMsg);
   }
 
@@ -172,16 +224,14 @@ public class TestKafkaSink {
       fail("Error Occurred");
     }
     assertNull(
-      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
+      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
   }
 
   private Context prepareDefaultContext() {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
-    context.put("brokerList", testUtil.getKafkaServerUrl());
-    context.put("kafka.request.required.acks", "1");
-    context.put("kafka.producer.type","sync");
-    context.put("batchSize", "1");
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(BATCH_SIZE, "1");
     return context;
   }