You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/24 08:11:20 UTC
git commit: FLUME-2470. Kafka Sink and Source must use camel case for
all configs.
Repository: flume
Updated Branches:
refs/heads/trunk 186a3b808 -> bde2c2821
FLUME-2470. Kafka Sink and Source must use camel case for all configs.
(Gwen Shapira via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bde2c282
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bde2c282
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bde2c282
Branch: refs/heads/trunk
Commit: bde2c28211a2d05a9930f1599cb15864ad3cdba0
Parents: 186a3b8
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Sep 23 23:10:25 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Sep 23 23:10:25 2014 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 67 ++++++++-------
.../org/apache/flume/sink/kafka/KafkaSink.java | 65 ++++++---------
.../flume/sink/kafka/KafkaSinkConstants.java | 20 ++++-
.../apache/flume/sink/kafka/TestKafkaSink.java | 23 ++----
.../src/test/resources/kafka-server.properties | 1 +
.../src/test/resources/log4j.properties | 2 +-
.../apache/flume/source/kafka/KafkaSource.java | 40 ++++++---
.../source/kafka/KafkaSourceConstants.java | 15 ++--
.../flume/source/kafka/KafkaSourceUtil.java | 87 +++++++++++++++-----
.../flume/source/kafka/KafkaSourceTest.java | 12 ++-
.../flume/source/kafka/KafkaSourceUtilTest.java | 43 +++++++---
11 files changed, 232 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 11c1ad7..ce52946 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1101,29 +1101,33 @@ Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
If you have multiple Kafka sources running, you can configure them with the same Consumer Group
so each will read a unique set of partitions for the topic.
-The properties below are required properties, but you can specify any Kafka parameter you want
-and it will be passed to the consumer. Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>`_
-for details
-
-=========================== =========== ===================================================
-Property Name Default Description
-=========================== =========== ===================================================
-**channels** --
-**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
-**kafka.zookeeper.connect** -- URI of ZooKeeper used by Kafka cluster
-**kadka.group.id** -- Unique identified of consumer group. Setting the same id in multiple sources or agents
- indicates that they are part of the same consumer group
-**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only.
-batchSize 1000 Maximum number of messages written to Channel in one batch
-batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel
- The batch will be written whenever the first of size and time will be reached.
-kafka.auto.commit.enable false If true, Kafka will commit events automatically - faster but less durable option.
- when false, the Kafka Source will commit events before writing batch to channel
-consumer.timeout.ms 10 Polling interval for new data for batch.
- Low value means more CPU usage.
- High value means the maxBatchDurationMillis may be missed while waiting for
- additional data.
-=========================== =========== ===================================================
+
+
+=============================== =========== ===================================================
+Property Name Default Description
+=============================== =========== ===================================================
+**channels** --
+**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
+**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster
+**groupId** flume Unique identified of consumer group. Setting the same id in multiple sources or agents
+ indicates that they are part of the same consumer group
+**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only.
+batchSize 1000 Maximum number of messages written to Channel in one batch
+batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel
+ The batch will be written whenever the first of size and time will be reached.
+Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported
+ by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
+ For example: kafka.consumer.timeout.ms
+ Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
+=============================== =========== ===================================================
+
+.. note:: The Kafka Source overrides two Kafka consumer parameters:
+ auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance
+ this can be set to "true", however, this can lead to loss of data
+ consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
+ setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means
+ higher latency in writing batches to channel (since we'll wait longer for data to arrive).
+
Example for agent named tier1:
@@ -1131,9 +1135,9 @@ Example for agent named tier1:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
- tier1.sources.source1.kafka.zookeeper.connect = localhost:2181
+ tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
- tier1.sources.source1.kafka.group.id = flume
+ tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100
@@ -2152,7 +2156,7 @@ Required properties are marked in bold font.
Property Name Default Description
=============================== =================== =============================================================================================
**type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
-**kafka.metadata.broker.list** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions
+**brokerList** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions
This can be a partial list of brokers, but we recommend at least two for HA.
The format is comma separated list of hostname:port
topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured,
@@ -2160,13 +2164,12 @@ topic default-flume-topic The topic in Kafka to whic
If the event header contains a "topic" field, the event will be published to that topic
overriding the topic configured here.
batchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
-kafka.request.required.acks 0 How many replicas must acknowledge a message before its considered successfully written.
+requiredAcks 1 How many replicas must acknowledge a message before its considered successfully written.
Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
- The default is the fastest option, but we *highly recommend* setting this to -1 to avoid data loss
-kafka.producer.type sync Whether messages should be sent to broker synchronously or using an asynchronous background thread.
- Accepted values are sync (safest) and async (faster but potentially unsafe)
+ Set this to -1 to avoid data loss in some cases of leader failure.
Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported
by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
+ For example: kafka.producer.type
=============================== =================== =============================================================================================
.. note:: Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka.
@@ -2186,8 +2189,8 @@ argument.
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mytopic
- a1.sinks.k1.kafka.metadata.broker.list = localhost:9092
- a1.sinks.k1.kafka.request.required.acks = 1
+ a1.sinks.k1.brokerList = localhost:9092
+ a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index a6121ac..a90b950 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -28,10 +28,10 @@ import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import java.util.Properties;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
+import java.util.ArrayList;
/**
* A Flume Sink that can publish messages to Kafka.
@@ -43,11 +43,7 @@ import java.util.Properties;
* partition key
* <p/>
* Mandatory properties are:
- * kafka.metadata.broker.list -- can be a partial list,
- * but at least 2 are recommended for HA
- * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one
- * broker), -1 (accepted by all brokers)
- * kafka.producer.type -- for safety, this should be sync
+ * brokerList -- can be a partial list, but at least 2 are recommended for HA
* <p/>
* <p/>
* however, any property starting with "kafka." will be passed along to the
@@ -60,6 +56,8 @@ import java.util.Properties;
* different topics
* batchSize - how many messages to process in one batch. Larger batches
* improve throughput while adding latency.
+ * requiredAcks -- 0 (unsafe), 1 (accepted by at least one broker, default),
+ * -1 (accepted by all brokers)
* <p/>
* header properties (per event):
* topic
@@ -70,7 +68,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
public static final String KEY_HDR = "key";
public static final String TOPIC_HDR = "topic";
- private Properties producerProps;
+ private Properties kafkaProps;
private Producer<String, byte[]> producer;
private String topic;
private int batchSize;
@@ -154,7 +152,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
@Override
public synchronized void start() {
// instantiate the producer
- ProducerConfig config = new ProducerConfig(producerProps);
+ ProducerConfig config = new ProducerConfig(kafkaProps);
producer = new Producer<String, byte[]>(config);
super.start();
}
@@ -166,54 +164,43 @@ public class KafkaSink extends AbstractSink implements Configurable {
}
+ /**
+ * We configure the sink and generate properties for the Kafka Producer
+ *
+ * Kafka producer properties is generated as follows:
+ * 1. We generate a properties object with some static defaults that
+ * can be overridden by Sink configuration
+ * 2. We add the configuration users added for Kafka (parameters starting
+ * with .kafka. and must be valid Kafka Producer properties
+ * 3. We add the sink's documented parameters which can override other
+ * properties
+ *
+ * @param context
+ */
@Override
public void configure(Context context) {
batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
KafkaSinkConstants.DEFAULT_BATCH_SIZE);
- logger.debug("Using batch size: {}", batchSize);
messageList =
new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
- Map<String, String> params = context.getParameters();
- logger.debug("all params: " + params.entrySet().toString());
- setProducerProps(params);
- if (!producerProps.contains("serializer.class")) {
- producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder");
- }
- if (!producerProps.contains("key.serializer.class")) {
- producerProps.put("key.serializer.class",
- "kafka.serializer.StringEncoder");
- }
+ logger.debug("Using batch size: {}", batchSize);
topic = context.getString(KafkaSinkConstants.TOPIC,
KafkaSinkConstants.DEFAULT_TOPIC);
if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
- logger.warn("The Properties 'preprocessor' or 'topic' is not set. " +
- "Using the default topic name" +
+ logger.warn("The Property 'topic' is not set. " +
+ "Using the default topic name: " +
KafkaSinkConstants.DEFAULT_TOPIC);
} else {
logger.info("Using the static topic: " + topic +
" this may be over-ridden by event headers");
}
- }
+ kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
- private void setProducerProps(Map<String, String> params) {
- producerProps = new Properties();
- for (String key : params.keySet()) {
- String value = params.get(key).trim();
- key = key.trim();
- if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) {
- // remove the prefix
- key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1,
- key.length());
- producerProps.put(key.trim(), value);
- if (logger.isDebugEnabled()) {
- logger.debug("Reading a Kafka Producer Property: key: " + key +
- ", value: " + value);
- }
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Kafka producer properties: " + kafkaProps);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 48d875e..3ee12de 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -18,14 +18,30 @@
package org.apache.flume.sink.kafka;
+import kafka.serializer.StringDecoder;
+
public class KafkaSinkConstants {
- public static final String PROPERTY_PREFIX = "kafka";
+ public static final String PROPERTY_PREFIX = "kafka.";
/* Properties */
- public static final String DEFAULT_TOPIC = "default-flume-topic";
+
public static final String TOPIC = "topic";
public static final String BATCH_SIZE = "batchSize";
+ public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
+ public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
+ public static final String BROKER_LIST_KEY = "metadata.broker.list";
+ public static final String REQUIRED_ACKS_KEY = "request.required.acks";
+ public static final String BROKER_LIST_FLUME_KEY = "brokerList";
+ public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
+
+
public static final int DEFAULT_BATCH_SIZE = 100;
+ public static final String DEFAULT_TOPIC = "default-flume-topic";
+ public static final String DEFAULT_MESSAGE_SERIALIZER =
+ "kafka.serializer.DefaultEncoder";
+ public static final String DEFAULT_KEY_SERIALIZER =
+ "kafka.serializer.StringEncoder";
+ public static final String DEFAULT_REQUIRED_ACKS = "1";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index aed6dac..80f764f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -126,7 +127,7 @@ public class TestKafkaSink {
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
- String msg = "my message";
+ String msg = "test-topic-and-key-from-header";
Map<String, String> headers = new HashMap<String, String>();
headers.put("topic", TestConstants.CUSTOM_TOPIC);
headers.put("key", TestConstants.CUSTOM_KEY);
@@ -156,9 +157,8 @@ public class TestKafkaSink {
}
@Test
- public void testEmptyChannel() throws UnsupportedEncodingException {
-
-
+ public void testEmptyChannel() throws UnsupportedEncodingException,
+ EventDeliveryException {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
@@ -167,25 +167,20 @@ public class TestKafkaSink {
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
- try {
- Sink.Status status = kafkaSink.process();
- if (status == Sink.Status.BACKOFF) {
- fail("Error Occurred");
- }
- } catch (EventDeliveryException ex) {
- // ignore
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
}
assertNull(
testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC));
-
}
-
private Context prepareDefaultContext() {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
- context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl());
+ context.put("brokerList", testUtil.getKafkaServerUrl());
context.put("kafka.request.required.acks", "1");
+ context.put("kafka.producer.type","sync");
context.put("batchSize", "1");
return context;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
index c07cdea..02a81e2 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -1,4 +1,5 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
+# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
index bdcb643..b86600b 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
@@ -15,7 +15,7 @@
kafka.logs.dir=target/logs
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index da78f80..231ae42 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
@@ -66,6 +67,7 @@ public class KafkaSource extends AbstractSource
private int consumerTimeout;
private boolean kafkaAutoCommitEnabled;
private Context context;
+ private Properties kafkaProps;
private final List<Event> eventList = new ArrayList<Event>();
public Status process() throws EventDeliveryException {
@@ -122,6 +124,19 @@ public class KafkaSource extends AbstractSource
}
}
+ /**
+ * We configure the source and generate properties for the Kafka Consumer
+ *
+ * Kafka Consumer properties are generated as follows:
+ * 1. Generate a properties object with some static defaults that
+ * can be overridden by Source configuration
+ * 2. We add the configuration users added for Kafka (parameters starting
+ * with kafka. and must be valid Kafka Consumer properties
+ * 3. We add the source documented parameters which can override other
+ * properties
+ *
+ * @param context
+ */
public void configure(Context context) {
this.context = context;
batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
@@ -130,21 +145,16 @@ public class KafkaSource extends AbstractSource
KafkaSourceConstants.DEFAULT_BATCH_DURATION);
topic = context.getString(KafkaSourceConstants.TOPIC);
- //if consumer timeout and autocommit were not set by user,
- // set them to 10ms and false
- consumerTimeout = context.getInteger(KafkaSourceConstants.CONSUMER_TIMEOUT,
- KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
- context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
- Integer.toString(consumerTimeout));
- String autoCommit = context.getString(
- KafkaSourceConstants.AUTO_COMMIT_ENABLED,
- String.valueOf(KafkaSourceConstants.DEFAULT_AUTO_COMMIT));
- kafkaAutoCommitEnabled = Boolean.valueOf(autoCommit);
- context.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,autoCommit);
-
if(topic == null) {
throw new ConfigurationException("Kafka topic must be specified.");
}
+
+ kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
+ consumerTimeout = Integer.parseInt(kafkaProps.getProperty(
+ KafkaSourceConstants.CONSUMER_TIMEOUT));
+ kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
+ KafkaSourceConstants.AUTO_COMMIT_ENABLED));
+
}
@Override
@@ -153,7 +163,7 @@ public class KafkaSource extends AbstractSource
try {
//initialize a consumer. This creates the connection to ZooKeeper
- consumer = KafkaSourceUtil.getConsumer(context);
+ consumer = KafkaSourceUtil.getConsumer(kafkaProps);
} catch (Exception e) {
throw new FlumeException("Unable to create consumer. " +
"Check whether the ZooKeeper server is up and that the " +
@@ -192,6 +202,10 @@ public class KafkaSource extends AbstractSource
}
+
+
+
+
/**
* Check if there are messages waiting in Kafka,
* waiting until timeout (10ms by default) for messages to arrive.
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index ac86f65..169cc10 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -21,16 +21,19 @@ public class KafkaSourceConstants {
public static final String TIMESTAMP = "timestamp";
public static final String BATCH_SIZE = "batchSize";
public static final String BATCH_DURATION_MS = "batchDurationMillis";
- public static final String CONSUMER_TIMEOUT = "kafka.consumer.timeout.ms";
- public static final String AUTO_COMMIT_ENABLED = "kafka.auto.commit.enabled";
- public static final String ZOOKEEPER_CONNECT = "kafka.zookeeper.connect";
- public static final String GROUP_ID = "kafka.group.id";
- public static final String PROPERTY_PREFIX = "kafka";
+ public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
+ public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled";
+ public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+ public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect";
+ public static final String GROUP_ID = "group.id";
+ public static final String GROUP_ID_FLUME = "groupId";
+ public static final String PROPERTY_PREFIX = "kafka.";
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final int DEFAULT_BATCH_DURATION = 1000;
- public static final int DEFAULT_CONSUMER_TIMEOUT = 10;
+ public static final String DEFAULT_CONSUMER_TIMEOUT = "10";
public static final boolean DEFAULT_AUTO_COMMIT = false;
+ public static final String DEFAULT_GROUP_ID = "flume";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
index 8397272..4a4034b 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
@@ -25,6 +25,7 @@ import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flume.Context;
+import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,32 +33,80 @@ public class KafkaSourceUtil {
private static final Logger log =
LoggerFactory.getLogger(KafkaSourceUtil.class);
- public static Properties getKafkaConfigProperties(Context context) {
+ public static Properties getKafkaProperties(Context context) {
log.info("context={}",context.toString());
- Properties props = new Properties();
- Map<String, String> contextMap = context.getParameters();
- for(String key : contextMap.keySet()) {
- String value = contextMap.get(key).trim();
- key = key.trim();
- if (key.startsWith(KafkaSourceConstants.PROPERTY_PREFIX)) {
- // remove the prefix
- key = key.substring(KafkaSourceConstants.PROPERTY_PREFIX.length() + 1,
- key.length());
- props.put(key, value);
- if (log.isDebugEnabled()) {
- log.debug("Reading a Kafka Producer Property: key: " + key +
- ", value: " + value);
- }
- }
- }
+ Properties props = generateDefaultKafkaProps();
+ setKafkaProps(context,props);
+ addDocumentedKafkaProps(context,props);
return props;
}
- public static ConsumerConnector getConsumer(Context context) {
+ public static ConsumerConnector getConsumer(Properties kafkaProps) {
ConsumerConfig consumerConfig =
- new ConsumerConfig(getKafkaConfigProperties(context));
+ new ConsumerConfig(kafkaProps);
ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(consumerConfig);
return consumer;
}
+
+ /**
+ * Generate consumer properties object with some defaults
+ * @return
+ */
+ private static Properties generateDefaultKafkaProps() {
+ Properties props = new Properties();
+ props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,
+ KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+ props.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
+ KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
+ props.put(KafkaSourceConstants.GROUP_ID,
+ KafkaSourceConstants.DEFAULT_GROUP_ID);
+ return props;
+ }
+
+ /**
+ * Add all configuration parameters starting with "kafka"
+ * to consumer properties
+ */
+ private static void setKafkaProps(Context context,Properties kafkaProps) {
+
+ Map<String,String> kafkaProperties =
+ context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX);
+
+ for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
+
+ kafkaProps.put(prop.getKey(), prop.getValue());
+ if (log.isDebugEnabled()) {
+ log.debug("Reading a Kafka Producer Property: key: "
+ + prop.getKey() + ", value: " + prop.getValue());
+ }
+ }
+ }
+
+ /**
+ * Some of the producer properties are especially important
+ * We documented them and gave them a camel-case name to match Flume config
+ * If user set these, we will override any existing parameters with these
+ * settings.
+ * Knowledge of which properties are documented is maintained here for now.
+ * If this will become a maintenance issue we'll set a proper data structure.
+ */
+ private static void addDocumentedKafkaProps(Context context,
+ Properties kafkaProps)
+ throws ConfigurationException {
+ String zookeeperConnect = context.getString(
+ KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
+ if (zookeeperConnect == null) {
+ throw new ConfigurationException("ZookeeperConnect must contain " +
+ "at least one ZooKeeper server");
+ }
+ kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect);
+
+ String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
+
+ if (groupID != null ) {
+ kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID);
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
index 1009f1c..d067e24 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.*;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Properties;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import kafka.message.MessageAndMetadata;
import org.apache.flume.*;
import org.apache.flume.PollableSource.Status;
import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.junit.After;
import org.junit.Before;
@@ -74,11 +76,11 @@ public class KafkaSourceTest {
}
context = new Context();
- context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,
+ context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
kafkaServer.getZkConnectString());
- context.put(KafkaSourceConstants.GROUP_ID,"flume");
+ context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
context.put(KafkaSourceConstants.TOPIC,topicName);
- context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,"100");
+ context.put("kafka.consumer.timeout.ms","100");
ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
@@ -183,7 +185,7 @@ public class KafkaSourceTest {
public void testNonExistingZk() throws EventDeliveryException,
SecurityException, NoSuchFieldException, IllegalArgumentException,
IllegalAccessException, InterruptedException {
- context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,"blabla:666");
+ context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
kafkaSource.configure(context);
kafkaSource.start();
Thread.sleep(500L);
@@ -192,4 +194,6 @@ public class KafkaSourceTest {
assertEquals(Status.BACKOFF, status);
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
index b9a1b25..f87e5ae 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
@@ -39,10 +39,12 @@ public class KafkaSourceUtilTest {
@Before
public void setUp() throws Exception {
- context.put("consumer.timeout", "10");
+ context.put("kafka.consumer.timeout", "10");
context.put("type", "KafkaSource");
context.put("topic", "test");
- props = KafkaSourceUtil.getKafkaConfigProperties(context);
+ context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
+ context.put("groupId","test");
+ props = KafkaSourceUtil.getKafkaProperties(context);
zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
@@ -53,23 +55,38 @@ public class KafkaSourceUtilTest {
zookeeper.stopZookeeper();
}
- @Test
- public void testGetKafkaConfigParameter() {
- assertEquals("10",props.getProperty("consumer.timeout"));
- assertEquals("test",props.getProperty("topic"));
- assertNull(props.getProperty("type"));
- }
-
@Test
public void testGetConsumer() {
- context.put("zookeeper.connect", "127.0.0.1:"+zkPort);
- context.put("group.id","test");
-
- ConsumerConnector cc = KafkaSourceUtil.getConsumer(context);
+ ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
assertNotNull(cc);
}
+ @Test
+ public void testKafkaConsumerProperties() {
+ Context context = new Context();
+ context.put("kafka.auto.commit.enabled", "override.default.autocommit");
+ context.put("kafka.fake.property", "kafka.property.value");
+ context.put("kafka.zookeeper.connect","bad-zookeeper-list");
+ context.put("zookeeperConnect","real-zookeeper-list");
+ Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
+
+ //check that we have defaults set
+ assertEquals(
+ kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
+ KafkaSourceConstants.DEFAULT_GROUP_ID);
+ //check that kafka properties override the default and get correct name
+ assertEquals(
+ kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
+ "override.default.autocommit");
+ //check that any kafka property gets in
+ assertEquals(kafkaProps.getProperty("fake.property"),
+ "kafka.property.value");
+ //check that documented property overrides defaults
+ assertEquals(kafkaProps.getProperty("zookeeper.connect")
+ ,"real-zookeeper-list");
+ }
+
}