You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/15 23:28:00 UTC
git commit: FLUME-2251. Kafka Sink.
Repository: flume
Updated Branches:
refs/heads/trunk acc965134 -> 75f748cbd
FLUME-2251. Kafka Sink.
(Thilina Buddhika, Gwen Shapira via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/75f748cb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/75f748cb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/75f748cb
Branch: refs/heads/trunk
Commit: 75f748cbd101d6efe8463a1c747fb87d2f668091
Parents: acc9651
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Sep 15 14:26:19 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Sep 15 14:26:19 2014 -0700
----------------------------------------------------------------------
flume-ng-dist/pom.xml | 4 +
flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 68 ++++++
.../org/apache/flume/sink/kafka/KafkaSink.java | 219 +++++++++++++++++++
.../flume/sink/kafka/KafkaSinkConstants.java | 31 +++
.../apache/flume/sink/kafka/TestConstants.java | 25 +++
.../apache/flume/sink/kafka/TestKafkaSink.java | 212 ++++++++++++++++++
.../flume/sink/kafka/util/KafkaConsumer.java | 98 +++++++++
.../flume/sink/kafka/util/KafkaLocal.java | 52 +++++
.../apache/flume/sink/kafka/util/TestUtil.java | 174 +++++++++++++++
.../flume/sink/kafka/util/ZooKeeperLocal.java | 62 ++++++
.../src/test/resources/kafka-server.properties | 117 ++++++++++
.../src/test/resources/log4j.properties | 78 +++++++
.../src/test/resources/zookeeper.properties | 20 ++
flume-ng-sinks/pom.xml | 1 +
pom.xml | 6 +
15 files changed, 1167 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 8c18af6..ca3cd8b 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -150,6 +150,10 @@
<artifactId>flume-ng-morphline-solr-sink</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-kafka-sink</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-scribe-source</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
new file mode 100644
index 0000000..307fa59
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>flume-ng-sinks</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.6.0-SNAPSHOT</version>
+ </parent>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-kafka-sink</artifactId>
+ <name>Flume Kafka Sink</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.1.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
new file mode 100644
index 0000000..a6121ac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -0,0 +1,219 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import com.google.common.base.Throwables;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.flume.*;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Flume Sink that can publish messages to Kafka.
+ * This is a general implementation that can be used with any Flume agent and
+ * a channel.
+ * The message can be any event and the key is a string that we read from the
+ * header
+ * For use of partitioning, use an interceptor to generate a header with the
+ * partition key
+ * <p/>
+ * Mandatory properties are:
+ * kafka.metadata.broker.list -- can be a partial list,
+ * but at least 2 are recommended for HA
+ * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one
+ * broker), -1 (accepted by all brokers)
+ * kafka.producer.type -- for safety, this should be sync
+ * <p/>
+ * <p/>
+ * however, any property starting with "kafka." will be passed along to the
+ * Kafka producer
+ * Read the Kafka producer documentation to see which configurations can be used
+ * <p/>
+ * Optional properties
+ * topic - there's a default, and also - this can be in the event header if
+ * you need to support events with
+ * different topics
+ * batchSize - how many messages to process in one batch. Larger batches
+ * improve throughput while adding latency.
+ * <p/>
+ * header properties (per event):
+ * topic
+ * key
+ */
+public class KafkaSink extends AbstractSink implements Configurable {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
+ public static final String KEY_HDR = "key";
+ public static final String TOPIC_HDR = "topic";
+ private Properties producerProps;
+ private Producer<String, byte[]> producer;
+ private String topic;
+ private int batchSize;
+ private List<KeyedMessage<String, byte[]>> messageList;
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ Status result = Status.READY;
+ Channel channel = getChannel();
+ Transaction transaction = null;
+ Event event = null;
+ String eventTopic = null;
+ String eventKey = null;
+
+ try {
+ long processedEvents = 0;
+
+ transaction = channel.getTransaction();
+ transaction.begin();
+
+ messageList.clear();
+ for (; processedEvents < batchSize; processedEvents += 1) {
+ event = channel.take();
+
+ if (event == null) {
+ // no events available in channel
+ break;
+ }
+
+ byte[] eventBody = event.getBody();
+ Map<String, String> headers = event.getHeaders();
+
+ if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
+ eventTopic = topic;
+ }
+
+ eventKey = headers.get(KEY_HDR);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+ + new String(eventBody, "UTF-8"));
+ logger.debug("event #{}", processedEvents);
+ }
+
+ // create a message and add to buffer
+ KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
+ (eventTopic, eventKey, eventBody);
+ messageList.add(data);
+
+ }
+
+ // publish batch and commit.
+ if (processedEvents > 0) {
+ producer.send(messageList);
+ }
+
+ transaction.commit();
+
+ } catch (Exception ex) {
+ String errorMsg = "Failed to publish events";
+ logger.error("Failed to publish events", ex);
+ result = Status.BACKOFF;
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ } catch (Exception e) {
+ logger.error("Transaction rollback failed", e);
+ throw Throwables.propagate(e);
+ }
+ }
+ throw new EventDeliveryException(errorMsg, ex);
+ } finally {
+ if (transaction != null) {
+ transaction.close();
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public synchronized void start() {
+ // instantiate the producer
+ ProducerConfig config = new ProducerConfig(producerProps);
+ producer = new Producer<String, byte[]>(config);
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ producer.close();
+ super.stop();
+ }
+
+
+ @Override
+ public void configure(Context context) {
+
+ batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
+ KafkaSinkConstants.DEFAULT_BATCH_SIZE);
+ logger.debug("Using batch size: {}", batchSize);
+ messageList =
+ new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
+ Map<String, String> params = context.getParameters();
+ logger.debug("all params: " + params.entrySet().toString());
+ setProducerProps(params);
+ if (!producerProps.contains("serializer.class")) {
+ producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder");
+ }
+ if (!producerProps.contains("key.serializer.class")) {
+ producerProps.put("key.serializer.class",
+ "kafka.serializer.StringEncoder");
+ }
+
+ topic = context.getString(KafkaSinkConstants.TOPIC,
+ KafkaSinkConstants.DEFAULT_TOPIC);
+ if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
+ logger.warn("The Properties 'preprocessor' or 'topic' is not set. " +
+ "Using the default topic name" +
+ KafkaSinkConstants.DEFAULT_TOPIC);
+ } else {
+ logger.info("Using the static topic: " + topic +
+ " this may be over-ridden by event headers");
+ }
+ }
+
+
+ private void setProducerProps(Map<String, String> params) {
+ producerProps = new Properties();
+ for (String key : params.keySet()) {
+ String value = params.get(key).trim();
+ key = key.trim();
+ if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) {
+ // remove the prefix
+ key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1,
+ key.length());
+ producerProps.put(key.trim(), value);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reading a Kafka Producer Property: key: " + key +
+ ", value: " + value);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
new file mode 100644
index 0000000..48d875e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -0,0 +1,31 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+public class KafkaSinkConstants {
+
+ public static final String PROPERTY_PREFIX = "kafka";
+
+ /* Properties */
+ public static final String DEFAULT_TOPIC = "default-flume-topic";
+ public static final String TOPIC = "topic";
+ public static final String BATCH_SIZE = "batchSize";
+
+ public static final int DEFAULT_BATCH_SIZE = 100;
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
new file mode 100644
index 0000000..f99be53
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -0,0 +1,25 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+public class TestConstants {
+ public static final String STATIC_TOPIC = "static-topic";
+ public static final String CUSTOM_KEY = "custom-key";
+ public static final String CUSTOM_TOPIC = "custom-topic";
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
new file mode 100644
index 0000000..aed6dac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -0,0 +1,212 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import kafka.message.MessageAndMetadata;
+import org.apache.flume.*;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.kafka.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for Kafka Sink
+ */
+public class TestKafkaSink {
+
+ private static TestUtil testUtil = TestUtil.getInstance();
+
+ @BeforeClass
+ public static void setup() {
+ testUtil.prepare();
+ List<String> topics = new ArrayList<String>(3);
+ topics.add(KafkaSinkConstants.DEFAULT_TOPIC);
+ topics.add(TestConstants.STATIC_TOPIC);
+ topics.add(TestConstants.CUSTOM_TOPIC);
+ testUtil.initTopicList(topics);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ testUtil.tearDown();
+ }
+
+ @Test
+ public void testDefaultTopic() {
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "default-topic-test";
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes());
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ String fetchedMsg = new String((byte[])
+ testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)
+ .message());
+ assertEquals(msg, fetchedMsg);
+ }
+
+ @Test
+ public void testStaticTopic() {
+ Context context = prepareDefaultContext();
+ // add the static topic
+ context.put(KafkaSinkConstants.TOPIC, TestConstants.STATIC_TOPIC);
+ String msg = "static-topic-test";
+
+ try {
+ Sink.Status status = prepareAndSend(context, msg);
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(
+ TestConstants.STATIC_TOPIC).message());
+ assertEquals(msg, fetchedMsg);
+ }
+
+ @Test
+ public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
+
+
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "my message";
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("topic", TestConstants.CUSTOM_TOPIC);
+ headers.put("key", TestConstants.CUSTOM_KEY);
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes(), headers);
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ MessageAndMetadata fetchedMsg =
+ testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+ assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+ assertEquals(TestConstants.CUSTOM_KEY,
+ new String((byte[]) fetchedMsg.key(), "UTF-8"));
+
+ }
+
+ @Test
+ public void testEmptyChannel() throws UnsupportedEncodingException {
+
+
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+ assertNull(
+ testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
+
+ }
+
+
+ private Context prepareDefaultContext() {
+ // Prepares a default context with Kafka Server Properties
+ Context context = new Context();
+ context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl());
+ context.put("kafka.request.required.acks", "1");
+ context.put("batchSize", "1");
+ return context;
+ }
+
+ private Sink.Status prepareAndSend(Context context, String msg)
+ throws EventDeliveryException {
+ Sink kafkaSink = new KafkaSink();
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes());
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ return kafkaSink.process();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
new file mode 100644
index 0000000..1c98922
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
@@ -0,0 +1,98 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Kafka Consumer implementation. This uses the current thread to fetch the
+ * next message from the queue and doesn't use a multi threaded implementation.
+ * So this implements a synchronous blocking call.
+ * To avoid infinite waiting, a timeout is implemented to wait only for
+ * 10 seconds before concluding that the message will not be available.
+ */
+public class KafkaConsumer {
+
+ private static final Logger logger = LoggerFactory.getLogger(
+ KafkaConsumer.class);
+
+ private final ConsumerConnector consumer;
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
+
+ public KafkaConsumer() {
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+ createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1"));
+ }
+
+ private static ConsumerConfig createConsumerConfig(String zkUrl,
+ String groupId) {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", zkUrl);
+ props.put("group.id", groupId);
+ props.put("zookeeper.session.timeout.ms", "400");
+ props.put("zookeeper.sync.time.ms", "200");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.offset.reset", "smallest");
+ props.put("consumer.timeout.ms","1000");
+ return new ConsumerConfig(props);
+ }
+
+ public void initTopicList(List<String> topics) {
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ for (String topic : topics) {
+ // we need only single threaded consumers
+ topicCountMap.put(topic, new Integer(1));
+ }
+ consumerMap = consumer.createMessageStreams(topicCountMap);
+ }
+
+ public MessageAndMetadata getNextMessage(String topic) {
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+ // it has only a single stream, because there is only one consumer
+ KafkaStream stream = streams.get(0);
+ final ConsumerIterator<byte[], byte[]> it = stream.iterator();
+ int counter = 0;
+ try {
+ if (it.hasNext()) {
+ return it.next();
+ } else {
+ return null;
+ }
+ } catch (ConsumerTimeoutException e) {
+ logger.error("0 messages available to fetch for the topic " + topic);
+ return null;
+ }
+ }
+
+ public void shutdown() {
+ consumer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
new file mode 100644
index 0000000..3c6e064
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
@@ -0,0 +1,52 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Kafka server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class KafkaLocal {
+
+ public KafkaServerStartable kafka;
+ public ZooKeeperLocal zookeeper;
+
+ public KafkaLocal(Properties kafkaProperties) throws IOException,
+ InterruptedException{
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ //start local kafka broker
+ kafka = new KafkaServerStartable(kafkaConfig);
+ }
+
+ public void start() throws Exception{
+ kafka.startup();
+ }
+
+ public void stop(){
+ kafka.shutdown();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
new file mode 100644
index 0000000..8855c53
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -0,0 +1,174 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import kafka.message.MessageAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * A utility class for starting/stopping Kafka Server.
+ */
+public class TestUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
+ private static TestUtil instance = new TestUtil();
+
+ private Random randPortGen = new Random(System.currentTimeMillis());
+ private KafkaLocal kafkaServer;
+ private KafkaConsumer kafkaConsumer;
+ private String hostname = "localhost";
+ private int kafkaLocalPort;
+ private int zkLocalPort;
+
+ private TestUtil() {
+ init();
+ }
+
+ public static TestUtil getInstance() {
+ return instance;
+ }
+
+ private void init() {
+ // get the localhost.
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Error getting the value of localhost. " +
+ "Proceeding with 'localhost'.", e);
+ }
+ }
+
+ private boolean startKafkaServer() {
+ Properties kafkaProperties = new Properties();
+ Properties zkProperties = new Properties();
+
+ try {
+ //load properties
+ zkProperties.load(Class.class.getResourceAsStream(
+ "/zookeeper.properties"));
+
+ ZooKeeperLocal zookeeper;
+ while (true) {
+ //start local Zookeeper
+ try {
+ zkLocalPort = getNextPort();
+ // override the Zookeeper client port with the generated one.
+ zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort));
+ zookeeper = new ZooKeeperLocal(zkProperties);
+ break;
+ } catch (BindException bindEx) {
+ // bind exception. port is already in use. Try a different port.
+ }
+ }
+ logger.info("ZooKeeper instance is successfully started on port " +
+ zkLocalPort);
+
+ kafkaProperties.load(Class.class.getResourceAsStream(
+ "/kafka-server.properties"));
+ // override the Zookeeper url.
+ kafkaProperties.setProperty("zookeeper.connect", getZkUrl());
+ while (true) {
+ kafkaLocalPort = getNextPort();
+ // override the Kafka server port
+ kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+ kafkaServer = new KafkaLocal(kafkaProperties);
+ try {
+ kafkaServer.start();
+ break;
+ } catch (BindException bindEx) {
+ // let's try another port.
+ }
+ }
+ logger.info("Kafka Server is successfully started on port " +
+ kafkaLocalPort);
+ return true;
+
+ } catch (Exception e) {
+ logger.error("Error starting the Kafka Server.", e);
+ return false;
+ }
+ }
+
+ private KafkaConsumer getKafkaConsumer() {
+ synchronized (this) {
+ if (kafkaConsumer == null) {
+ kafkaConsumer = new KafkaConsumer();
+ }
+ }
+ return kafkaConsumer;
+ }
+
+ public void initTopicList(List<String> topics) {
+ getKafkaConsumer().initTopicList(topics);
+ }
+
+ public MessageAndMetadata getNextMessageFromConsumer(String topic) {
+ return getKafkaConsumer().getNextMessage(topic);
+ }
+
+ public void prepare() {
+ boolean startStatus = startKafkaServer();
+ if (!startStatus) {
+ throw new RuntimeException("Error starting the server!");
+ }
+ try {
+ Thread.sleep(3 * 1000); // add this sleep time to
+ // ensure that the server is fully started before proceeding with tests.
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ getKafkaConsumer();
+ logger.info("Completed the prepare phase.");
+ }
+
+ public void tearDown() {
+ logger.info("Shutting down the Kafka Consumer.");
+ getKafkaConsumer().shutdown();
+ try {
+ Thread.sleep(3 * 1000); // add this sleep time to
+ // ensure that the server is fully started before proceeding with tests.
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ logger.info("Shutting down the kafka Server.");
+ kafkaServer.stop();
+ logger.info("Completed the tearDown phase.");
+ }
+
+ private synchronized int getNextPort() {
+ // generate a random port number between 49152 and 65535
+ return randPortGen.nextInt(65535 - 49152) + 49152;
+ }
+
+ public String getZkUrl() {
+ return hostname + ":" + zkLocalPort;
+ }
+
+ public String getKafkaServerUrl() {
+ return hostname + ":" + kafkaLocalPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
new file mode 100644
index 0000000..1a5728f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
@@ -0,0 +1,62 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka.util;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Zookeeper server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class ZooKeeperLocal {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(ZooKeeperLocal.class);
+ private ZooKeeperServerMain zooKeeperServer;
+
+ public ZooKeeperLocal(Properties zkProperties) throws IOException{
+ QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+ try {
+ quorumConfiguration.parseProperties(zkProperties);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ zooKeeperServer = new ZooKeeperServerMain();
+ final ServerConfig configuration = new ServerConfig();
+ configuration.readFrom(quorumConfiguration);
+
+ new Thread() {
+ public void run() {
+ try {
+ zooKeeperServer.runFromConfig(configuration);
+ } catch (IOException e) {
+ logger.error("Zookeeper startup failed.", e);
+ }
+ }
+ }.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
new file mode 100644
index 0000000..c07cdea
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=target/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=2
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bdcb643
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=target/logs
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
new file mode 100644
index 0000000..89e1b5e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=target
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 3381bde..4bac019 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -46,6 +46,7 @@ limitations under the License.
<module>flume-ng-hbase-sink</module>
<module>flume-ng-elasticsearch-sink</module>
<module>flume-ng-morphline-solr-sink</module>
+ <module>flume-ng-kafka-sink</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 150db2e..740edc2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1139,6 +1139,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-kafka-sink</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-scribe-source</artifactId>
<version>1.6.0-SNAPSHOT</version>