You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/15 23:28:00 UTC

git commit: FLUME-2251. Kafka Sink.

Repository: flume
Updated Branches:
  refs/heads/trunk acc965134 -> 75f748cbd


FLUME-2251. Kafka Sink.

(Thilina Buddhika, Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/75f748cb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/75f748cb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/75f748cb

Branch: refs/heads/trunk
Commit: 75f748cbd101d6efe8463a1c747fb87d2f668091
Parents: acc9651
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Sep 15 14:26:19 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Sep 15 14:26:19 2014 -0700

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |  68 ++++++
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 219 +++++++++++++++++++
 .../flume/sink/kafka/KafkaSinkConstants.java    |  31 +++
 .../apache/flume/sink/kafka/TestConstants.java  |  25 +++
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 212 ++++++++++++++++++
 .../flume/sink/kafka/util/KafkaConsumer.java    |  98 +++++++++
 .../flume/sink/kafka/util/KafkaLocal.java       |  52 +++++
 .../apache/flume/sink/kafka/util/TestUtil.java  | 174 +++++++++++++++
 .../flume/sink/kafka/util/ZooKeeperLocal.java   |  62 ++++++
 .../src/test/resources/kafka-server.properties  | 117 ++++++++++
 .../src/test/resources/log4j.properties         |  78 +++++++
 .../src/test/resources/zookeeper.properties     |  20 ++
 flume-ng-sinks/pom.xml                          |   1 +
 pom.xml                                         |   6 +
 15 files changed, 1167 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 8c18af6..ca3cd8b 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -150,6 +150,10 @@
       <artifactId>flume-ng-morphline-solr-sink</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-ng-kafka-sink</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-sources</groupId>
       <artifactId>flume-scribe-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
new file mode 100644
index 0000000..307fa59
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-ng-kafka-sink</artifactId>
+  <name>Flume Kafka Sink</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>0.8.1.1</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
new file mode 100644
index 0000000..a6121ac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -0,0 +1,219 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import com.google.common.base.Throwables;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.flume.*;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Flume Sink that can publish messages to Kafka.
+ * This is a general implementation that can be used with any Flume agent and
+ * a channel.
+ * The message can be any event and the key is a string that we read from the
+ * header
+ * For use of partitioning, use an interceptor to generate a header with the
+ * partition key
+ * <p/>
+ * Mandatory properties are:
+ * kafka.metadata.broker.list -- can be a partial list,
+ * but at least 2 are recommended for HA
+ * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one
+ * broker), -1 (accepted by all brokers)
+ * kafka.producer.type -- for safety, this should be sync
+ * <p/>
+ * <p/>
+ * however, any property starting with "kafka." will be passed along to the
+ * Kafka producer
+ * Read the Kafka producer documentation to see which configurations can be used
+ * <p/>
+ * Optional properties
+ * topic - there's a default, and also - this can be in the event header if
+ * you need to support events with
+ * different topics
+ * batchSize - how many messages to process in one batch. Larger batches
+ * improve throughput while adding latency.
+ * <p/>
+ * header properties (per event):
+ * topic
+ * key
+ */
+public class KafkaSink extends AbstractSink implements Configurable {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
+  public static final String KEY_HDR = "key";
+  public static final String TOPIC_HDR = "topic";
+  private Properties producerProps;
+  private Producer<String, byte[]> producer;
+  private String topic;
+  private int batchSize;
+  private List<KeyedMessage<String, byte[]>> messageList;
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    Status result = Status.READY;
+    Channel channel = getChannel();
+    Transaction transaction = null;
+    Event event = null;
+    String eventTopic = null;
+    String eventKey = null;
+
+    try {
+      long processedEvents = 0;
+
+      transaction = channel.getTransaction();
+      transaction.begin();
+
+      messageList.clear();
+      for (; processedEvents < batchSize; processedEvents += 1) {
+        event = channel.take();
+
+        if (event == null) {
+          // no events available in channel
+          break;
+        }
+
+        byte[] eventBody = event.getBody();
+        Map<String, String> headers = event.getHeaders();
+
+        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
+          eventTopic = topic;
+        }
+
+        eventKey = headers.get(KEY_HDR);
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+            + new String(eventBody, "UTF-8"));
+          logger.debug("event #{}", processedEvents);
+        }
+
+        // create a message and add to buffer
+        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
+          (eventTopic, eventKey, eventBody);
+        messageList.add(data);
+
+      }
+
+      // publish batch and commit.
+      if (processedEvents > 0) {
+        producer.send(messageList);
+      }
+
+      transaction.commit();
+
+    } catch (Exception ex) {
+      String errorMsg = "Failed to publish events";
+      logger.error("Failed to publish events", ex);
+      result = Status.BACKOFF;
+      if (transaction != null) {
+        try {
+          transaction.rollback();
+        } catch (Exception e) {
+          logger.error("Transaction rollback failed", e);
+          throw Throwables.propagate(e);
+        }
+      }
+      throw new EventDeliveryException(errorMsg, ex);
+    } finally {
+      if (transaction != null) {
+        transaction.close();
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public synchronized void start() {
+    // instantiate the producer
+    ProducerConfig config = new ProducerConfig(producerProps);
+    producer = new Producer<String, byte[]>(config);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    producer.close();
+    super.stop();
+  }
+
+
+  @Override
+  public void configure(Context context) {
+
+    batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
+      KafkaSinkConstants.DEFAULT_BATCH_SIZE);
+    logger.debug("Using batch size: {}", batchSize);
+    messageList =
+      new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
+    Map<String, String> params = context.getParameters();
+    logger.debug("all params: " + params.entrySet().toString());
+    setProducerProps(params);
+    if (!producerProps.contains("serializer.class")) {
+      producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder");
+    }
+    if (!producerProps.contains("key.serializer.class")) {
+      producerProps.put("key.serializer.class",
+        "kafka.serializer.StringEncoder");
+    }
+
+    topic = context.getString(KafkaSinkConstants.TOPIC,
+      KafkaSinkConstants.DEFAULT_TOPIC);
+    if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
+      logger.warn("The Properties 'preprocessor' or 'topic' is not set. " +
+        "Using the default topic name" +
+        KafkaSinkConstants.DEFAULT_TOPIC);
+    } else {
+      logger.info("Using the static topic: " + topic +
+        " this may be over-ridden by event headers");
+    }
+  }
+
+
+  private void setProducerProps(Map<String, String> params) {
+    producerProps = new Properties();
+    for (String key : params.keySet()) {
+      String value = params.get(key).trim();
+      key = key.trim();
+      if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) {
+        // remove the prefix
+        key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1,
+          key.length());
+        producerProps.put(key.trim(), value);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Reading a Kafka Producer Property: key: " + key +
+            ", value: " + value);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
new file mode 100644
index 0000000..48d875e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -0,0 +1,31 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+public class KafkaSinkConstants {
+
+  public static final String PROPERTY_PREFIX = "kafka";
+
+  /* Properties */
+  public static final String DEFAULT_TOPIC = "default-flume-topic";
+  public static final String TOPIC = "topic";
+  public static final String BATCH_SIZE = "batchSize";
+
+  public static final int DEFAULT_BATCH_SIZE = 100;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
new file mode 100644
index 0000000..f99be53
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -0,0 +1,25 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+public class TestConstants {
+  public static final String STATIC_TOPIC = "static-topic";
+  public static final String CUSTOM_KEY = "custom-key";
+  public static final String CUSTOM_TOPIC = "custom-topic";
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
new file mode 100644
index 0000000..aed6dac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -0,0 +1,212 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import kafka.message.MessageAndMetadata;
+import org.apache.flume.*;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.kafka.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for Kafka Sink
+ */
+public class TestKafkaSink {
+
+  private static TestUtil testUtil = TestUtil.getInstance();
+
+  @BeforeClass
+  public static void setup() {
+    testUtil.prepare();
+    List<String> topics = new ArrayList<String>(3);
+    topics.add(KafkaSinkConstants.DEFAULT_TOPIC);
+    topics.add(TestConstants.STATIC_TOPIC);
+    topics.add(TestConstants.CUSTOM_TOPIC);
+    testUtil.initTopicList(topics);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    testUtil.tearDown();
+  }
+
+  @Test
+  public void testDefaultTopic() {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "default-topic-test";
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes());
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    String fetchedMsg = new String((byte[])
+      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)
+        .message());
+    assertEquals(msg, fetchedMsg);
+  }
+
+  @Test
+  public void testStaticTopic() {
+    Context context = prepareDefaultContext();
+    // add the static topic
+    context.put(KafkaSinkConstants.TOPIC, TestConstants.STATIC_TOPIC);
+    String msg = "static-topic-test";
+
+    try {
+      Sink.Status status = prepareAndSend(context, msg);
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(
+      TestConstants.STATIC_TOPIC).message());
+    assertEquals(msg, fetchedMsg);
+  }
+
+  @Test
+  public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
+
+
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "my message";
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("topic", TestConstants.CUSTOM_TOPIC);
+    headers.put("key", TestConstants.CUSTOM_KEY);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata fetchedMsg =
+      testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+    assertEquals(TestConstants.CUSTOM_KEY,
+      new String((byte[]) fetchedMsg.key(), "UTF-8"));
+
+  }
+
+  @Test
+  public void testEmptyChannel() throws UnsupportedEncodingException {
+
+
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+    assertNull(
+      testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
+
+  }
+
+
+  private Context prepareDefaultContext() {
+    // Prepares a default context with Kafka Server Properties
+    Context context = new Context();
+    context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl());
+    context.put("kafka.request.required.acks", "1");
+    context.put("batchSize", "1");
+    return context;
+  }
+
+  private Sink.Status prepareAndSend(Context context, String msg)
+    throws EventDeliveryException {
+    Sink kafkaSink = new KafkaSink();
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes());
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    return kafkaSink.process();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
new file mode 100644
index 0000000..1c98922
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
@@ -0,0 +1,98 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Kafka Consumer implementation. This uses the current thread to fetch the
+ * next message from the queue and doesn't use a multi threaded implementation.
+ * So this implements a synchronous blocking call.
+ * To avoid infinite waiting, a timeout is implemented to wait only for
+ * 10 seconds before concluding that the message will not be available.
+ */
+public class KafkaConsumer {
+
+  private static final Logger logger = LoggerFactory.getLogger(
+      KafkaConsumer.class);
+
+  private final ConsumerConnector consumer;
+  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
+
+  public KafkaConsumer() {
+    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+        createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1"));
+  }
+
+  private static ConsumerConfig createConsumerConfig(String zkUrl,
+      String groupId) {
+    Properties props = new Properties();
+    props.put("zookeeper.connect", zkUrl);
+    props.put("group.id", groupId);
+    props.put("zookeeper.session.timeout.ms", "400");
+    props.put("zookeeper.sync.time.ms", "200");
+    props.put("auto.commit.interval.ms", "1000");
+    props.put("auto.offset.reset", "smallest");
+    props.put("consumer.timeout.ms","1000");
+    return new ConsumerConfig(props);
+  }
+
+  public void initTopicList(List<String> topics) {
+    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+    for (String topic : topics) {
+      // we need only single threaded consumers
+      topicCountMap.put(topic, new Integer(1));
+    }
+    consumerMap = consumer.createMessageStreams(topicCountMap);
+  }
+
+  public MessageAndMetadata getNextMessage(String topic) {
+    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+    // it has only a single stream, because there is only one consumer
+    KafkaStream stream = streams.get(0);
+    final ConsumerIterator<byte[], byte[]> it = stream.iterator();
+    int counter = 0;
+    try {
+      if (it.hasNext()) {
+        return it.next();
+      } else {
+        return null;
+      }
+    } catch (ConsumerTimeoutException e) {
+      logger.error("0 messages available to fetch for the topic " + topic);
+      return null;
+    }
+  }
+
+  public void shutdown() {
+    consumer.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
new file mode 100644
index 0000000..3c6e064
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
@@ -0,0 +1,52 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Kafka server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class KafkaLocal {
+
+    public KafkaServerStartable kafka;
+    public ZooKeeperLocal zookeeper;
+
+    public KafkaLocal(Properties kafkaProperties) throws IOException,
+        InterruptedException{
+        KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+        //start local kafka broker
+        kafka = new KafkaServerStartable(kafkaConfig);
+    }
+
+    public void start() throws Exception{
+        kafka.startup();
+    }
+
+    public void stop(){
+        kafka.shutdown();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
new file mode 100644
index 0000000..8855c53
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -0,0 +1,174 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.message.MessageAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * A utility class for starting/stopping Kafka Server.
+ */
+public class TestUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
+  private static TestUtil instance = new TestUtil();
+
+  private Random randPortGen = new Random(System.currentTimeMillis());
+  private KafkaLocal kafkaServer;
+  private KafkaConsumer kafkaConsumer;
+  private String hostname = "localhost";
+  private int kafkaLocalPort;
+  private int zkLocalPort;
+
+  private TestUtil() {
+    init();
+  }
+
+  public static TestUtil getInstance() {
+    return instance;
+  }
+
+  private void init() {
+    // get the localhost.
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      logger.warn("Error getting the value of localhost. " +
+          "Proceeding with 'localhost'.", e);
+    }
+  }
+
+  private boolean startKafkaServer() {
+    Properties kafkaProperties = new Properties();
+    Properties zkProperties = new Properties();
+
+    try {
+      //load properties
+      zkProperties.load(Class.class.getResourceAsStream(
+          "/zookeeper.properties"));
+
+      ZooKeeperLocal zookeeper;
+      while (true) {
+        //start local Zookeeper
+        try {
+          zkLocalPort = getNextPort();
+          // override the Zookeeper client port with the generated one.
+          zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort));
+          zookeeper = new ZooKeeperLocal(zkProperties);
+          break;
+        } catch (BindException bindEx) {
+          // bind exception. port is already in use. Try a different port.
+        }
+      }
+      logger.info("ZooKeeper instance is successfully started on port " +
+          zkLocalPort);
+
+      kafkaProperties.load(Class.class.getResourceAsStream(
+          "/kafka-server.properties"));
+      // override the Zookeeper url.
+      kafkaProperties.setProperty("zookeeper.connect", getZkUrl());
+      while (true) {
+        kafkaLocalPort = getNextPort();
+        // override the Kafka server port
+        kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+        kafkaServer = new KafkaLocal(kafkaProperties);
+        try {
+          kafkaServer.start();
+          break;
+        } catch (BindException bindEx) {
+          // let's try another port.
+        }
+      }
+      logger.info("Kafka Server is successfully started on port " +
+          kafkaLocalPort);
+      return true;
+
+    } catch (Exception e) {
+      logger.error("Error starting the Kafka Server.", e);
+      return false;
+    }
+  }
+
+  private KafkaConsumer getKafkaConsumer() {
+    synchronized (this) {
+      if (kafkaConsumer == null) {
+        kafkaConsumer = new KafkaConsumer();
+      }
+    }
+    return kafkaConsumer;
+  }
+
+  public void initTopicList(List<String> topics) {
+    getKafkaConsumer().initTopicList(topics);
+  }
+
+  public MessageAndMetadata getNextMessageFromConsumer(String topic) {
+    return getKafkaConsumer().getNextMessage(topic);
+  }
+
+  public void prepare() {
+    boolean startStatus = startKafkaServer();
+    if (!startStatus) {
+      throw new RuntimeException("Error starting the server!");
+    }
+    try {
+      Thread.sleep(3 * 1000);   // add this sleep time to
+      // ensure that the server is fully started before proceeding with tests.
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    getKafkaConsumer();
+    logger.info("Completed the prepare phase.");
+  }
+
+  public void tearDown() {
+    logger.info("Shutting down the Kafka Consumer.");
+    getKafkaConsumer().shutdown();
+    try {
+      Thread.sleep(3 * 1000);   // add this sleep time to
+      // ensure that the server is fully started before proceeding with tests.
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    logger.info("Shutting down the kafka Server.");
+    kafkaServer.stop();
+    logger.info("Completed the tearDown phase.");
+  }
+
+  private synchronized int getNextPort() {
+    // generate a random port number between 49152 and 65535
+    return randPortGen.nextInt(65535 - 49152) + 49152;
+  }
+
+  public String getZkUrl() {
+    return hostname + ":" + zkLocalPort;
+  }
+
+  public String getKafkaServerUrl() {
+    return hostname + ":" + kafkaLocalPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
new file mode 100644
index 0000000..1a5728f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
@@ -0,0 +1,62 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Zookeeper server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class ZooKeeperLocal {
+
+  private static final Logger logger =
+    LoggerFactory.getLogger(ZooKeeperLocal.class);
+    private ZooKeeperServerMain zooKeeperServer;
+
+    public ZooKeeperLocal(Properties zkProperties) throws IOException{
+        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+        try {
+            quorumConfiguration.parseProperties(zkProperties);
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        zooKeeperServer = new ZooKeeperServerMain();
+        final ServerConfig configuration = new ServerConfig();
+        configuration.readFrom(quorumConfiguration);
+
+        new Thread() {
+            public void run() {
+                try {
+                    zooKeeperServer.runFromConfig(configuration);
+                } catch (IOException e) {
+                    logger.error("Zookeeper startup failed.", e);
+                }
+            }
+        }.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
new file mode 100644
index 0000000..c07cdea
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=target/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=2
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bdcb643
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=target/logs
+
+log4j.rootLogger=INFO, stdout 
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
new file mode 100644
index 0000000..89e1b5e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=target
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 3381bde..4bac019 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -46,6 +46,7 @@ limitations under the License.
     <module>flume-ng-hbase-sink</module>
     <module>flume-ng-elasticsearch-sink</module>
     <module>flume-ng-morphline-solr-sink</module>
+    <module>flume-ng-kafka-sink</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 150db2e..740edc2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1139,6 +1139,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sinks</groupId>
+        <artifactId>flume-ng-kafka-sink</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-sources</groupId>
         <artifactId>flume-scribe-source</artifactId>
         <version>1.6.0-SNAPSHOT</version>