You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/29 19:44:37 UTC
[3/3] flume git commit: FLUME-2823: Flume-Kafka-Channel with new APIs
FLUME-2823: Flume-Kafka-Channel with new APIs
(Jeff Holoman via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e8c4a7bf
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e8c4a7bf
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e8c4a7bf
Branch: refs/heads/trunk
Commit: e8c4a7bffc74f6ea10ae6cc45adbaf4919f45186
Parents: 7f588e6
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Mar 29 09:45:40 2016 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Mar 29 09:45:40 2016 -0700
----------------------------------------------------------------------
flume-ng-channels/flume-kafka-channel/pom.xml | 6 +
.../flume/channel/kafka/KafkaChannel.java | 641 ++++++++++++-------
.../kafka/KafkaChannelConfiguration.java | 32 +-
.../flume/channel/kafka/TestKafkaChannel.java | 219 ++++---
.../src/test/resources/kafka-server.properties | 2 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 88 ++-
6 files changed, 634 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index fa1bd42..587b4b4 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -40,6 +40,12 @@ limitations under the License.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index c0c1c66..2d9b0c6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,108 +20,120 @@ package org.apache.flume.channel.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import kafka.consumer.*;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.apache.avro.io.*;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.conf.ConfigurationException;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
public class KafkaChannel extends BasicChannelSemantics {
- private final static Logger LOGGER =
- LoggerFactory.getLogger(KafkaChannel.class);
+ private final static Logger logger =
+ LoggerFactory.getLogger(KafkaChannel.class);
+ private final Properties consumerProps = new Properties();
+ private final Properties producerProps = new Properties();
- private final Properties kafkaConf = new Properties();
- private Producer<String, byte[]> producer;
+ private KafkaProducer<String, byte[]> producer;
private final String channelUUID = UUID.randomUUID().toString();
private AtomicReference<String> topic = new AtomicReference<String>();
private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
- private final Map<String, Integer> topicCountMap =
- Collections.synchronizedMap(new HashMap<String, Integer>());
+
+ //used to indicate if a rebalance has occurred during the current transaction
+ AtomicBoolean rebalanceFlag = new AtomicBoolean();
+ //This isn't a Kafka property per se, but we allow it to be configurable
+ private long pollTimeout = DEFAULT_POLL_TIMEOUT;
+
// Track all consumers to close them eventually.
- private final List<ConsumerAndIterator> consumers =
- Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
+ private final List<ConsumerAndRecords> consumers =
+ Collections.synchronizedList(new LinkedList<ConsumerAndRecords>());
private KafkaChannelCounter counter;
- /* Each ConsumerConnector commit will commit all partitions owned by it. To
+ /* Each Consumer commit will commit all partitions owned by it. To
* ensure that each partition is only committed when all events are
- * actually done, we will need to keep a ConsumerConnector per thread.
- * See Neha's answer here:
- * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic
- * Since only one consumer connector will a partition at any point in time,
- * when we commit the partition we would have committed all events to the
- * final destination from that partition.
- *
- * If a new partition gets assigned to this connector,
- * my understanding is that all message from the last partition commit will
- * get replayed which may cause duplicates -- which is fine as this
- * happens only on partition rebalancing which is on failure or new nodes
- * coming up, which is rare.
+ * actually done, we will need to keep a Consumer per thread.
*/
- private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new
- ThreadLocal<ConsumerAndIterator>() {
- @Override
- public ConsumerAndIterator initialValue() {
- return createConsumerAndIter();
- }
- };
+
+ private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new
+ ThreadLocal<ConsumerAndRecords>() {
+ @Override
+ public ConsumerAndRecords initialValue() {
+ return createConsumerAndRecords();
+ }
+ };
@Override
public void start() {
- try {
- LOGGER.info("Starting Kafka Channel: " + getName());
- producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf));
+ logger.info("Starting Kafka Channel: {}", getName());
+ producer = new KafkaProducer<String, byte[]>(producerProps);
// We always have just one topic being read by one thread
- LOGGER.info("Topic = " + topic.get());
- topicCountMap.put(topic.get(), 1);
+ logger.info("Topic = {}", topic.get());
counter.start();
super.start();
- } catch (Exception e) {
- LOGGER.error("Could not start producer");
- throw new FlumeException("Unable to create Kafka Connections. " +
- "Check whether Kafka Brokers are up and that the " +
- "Flume agent can connect to it.", e);
- }
}
@Override
public void stop() {
- for (ConsumerAndIterator c : consumers) {
+ for (ConsumerAndRecords c : consumers) {
try {
- decommissionConsumerAndIterator(c);
+ decommissionConsumerAndRecords(c);
} catch (Exception ex) {
- LOGGER.warn("Error while shutting down consumer.", ex);
+ logger.warn("Error while shutting down consumer.", ex);
}
}
producer.close();
counter.stop();
super.stop();
- LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(),
- counter);
+ logger.info("Kafka channel {} stopped. Metrics: {}", getName(),
+ counter);
}
@Override
@@ -129,98 +141,147 @@ public class KafkaChannel extends BasicChannelSemantics {
return new KafkaTransaction();
}
- private synchronized ConsumerAndIterator createConsumerAndIter() {
- try {
- ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf);
- ConsumerConnector consumer =
- Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
- consumer.createMessageStreams(topicCountMap);
- final List<KafkaStream<byte[], byte[]>> streamList = consumerMap
- .get(topic.get());
- KafkaStream<byte[], byte[]> stream = streamList.remove(0);
- ConsumerAndIterator ret =
- new ConsumerAndIterator(consumer, stream.iterator(), channelUUID);
- consumers.add(ret);
- LOGGER.info("Created new consumer to connect to Kafka");
- return ret;
- } catch (Exception e) {
- throw new FlumeException("Unable to connect to Kafka", e);
- }
- }
-
- Properties getKafkaConf() {
- return kafkaConf;
- }
-
@Override
public void configure(Context ctx) {
- String topicStr = ctx.getString(TOPIC);
+
+ //Can remove in the next release
+ translateOldProps(ctx);
+
+ String topicStr = ctx.getString(TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
- LOGGER
- .info("Topic was not specified. Using " + topicStr + " as the topic.");
+ logger.info("Topic was not specified. Using {} as the topic.", topicStr);
}
topic.set(topicStr);
- String groupId = ctx.getString(GROUP_ID_FLUME);
- if (groupId == null || groupId.isEmpty()) {
- groupId = DEFAULT_GROUP_ID;
- LOGGER.info(
- "Group ID was not specified. Using " + groupId + " as the group id.");
+ String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
+ if (bootStrapServers == null || bootStrapServers.isEmpty()) {
+ throw new ConfigurationException("Bootstrap Servers must be specified");
}
- String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
- if (brokerList == null || brokerList.isEmpty()) {
- throw new ConfigurationException("Broker List must be specified");
+
+ setProducerProps(ctx, bootStrapServers);
+ setConsumerProps(ctx, bootStrapServers);
+
+ parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
+ pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
+
+ if (counter == null) {
+ counter = new KafkaChannelCounter(getName());
}
- String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
- if (zkConnect == null || zkConnect.isEmpty()) {
- throw new ConfigurationException(
- "Zookeeper Connection must be specified");
+ }
+
+ // We can remove this once the properties are officially deprecated
+ private void translateOldProps(Context ctx) {
+
+ if (!(ctx.containsKey(TOPIC_CONFIG))) {
+ ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
+ logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
}
- kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
- kafkaConf.put(GROUP_ID, groupId);
- kafkaConf.put(BROKER_LIST_KEY, brokerList);
- kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
- kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
- if(kafkaConf.get(CONSUMER_TIMEOUT) == null) {
- kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT);
+
+ //Broker List
+ // If there is no value we need to check and set the old param and log a warning message
+ if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
+ String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+ if (brokerList == null || brokerList.isEmpty()) {
+ throw new ConfigurationException("Bootstrap Servers must be specified");
+ } else {
+ ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+ }
}
- kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
- LOGGER.info(kafkaConf.toString());
- parseAsFlumeEvent =
- ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
-
- boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET,
- DEFAULT_READ_SMALLEST_OFFSET);
- // If the data is to be parsed as Flume events, we always read the smallest.
- // Else, we read the configuration, which by default reads the largest.
- if (parseAsFlumeEvent || readSmallest) {
- // readSmallest is eval-ed only if parseAsFlumeEvent is false.
- // The default is largest, so we don't need to set it explicitly.
- kafkaConf.put("auto.offset.reset", "smallest");
+
+ //GroupId
+ // If there is an old Group Id set, then use that if no groupId is set.
+ if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
+ String oldGroupId = ctx.getString(GROUP_ID_FLUME);
+ if ( oldGroupId != null && !oldGroupId.isEmpty()) {
+ ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
+ logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+ }
}
- if (counter == null) {
- counter = new KafkaChannelCounter(getName());
+ if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) {
+ Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET);
+ String auto;
+ if (oldReadSmallest != null) {
+ if (oldReadSmallest) {
+ auto = "earliest";
+ } else {
+ auto = "latest";
+ }
+ ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
+ logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ }
+
+ }
+ }
+
+
+ private void setProducerProps(Context ctx, String bootStrapServers) {
+ producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
+ //Defaults overridden based on config
+ producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+ logger.info("Producer properties: " + producerProps.toString());
+ }
+
+ protected Properties getProducerProps() {
+ return producerProps;
+ }
+
+ private void setConsumerProps(Context ctx, String bootStrapServers) {
+ String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+ if (groupId == null || groupId.isEmpty()) {
+ groupId = DEFAULT_GROUP_ID;
+ logger.info("Group ID was not specified. Using {} as the group id.", groupId);
}
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
+ //Defaults overridden based on config
+ consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
+ //These always take precedence over config
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ logger.info(consumerProps.toString());
+ }
+ protected Properties getConsumerProps() { return consumerProps; }
+
+
+ private synchronized ConsumerAndRecords createConsumerAndRecords() {
+ try {
+ KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
+ ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
+ logger.info("Created new consumer to connect to Kafka");
+ car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag));
+ car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+ consumers.add(car);
+ return car;
+ } catch (Exception e) {
+ throw new FlumeException("Unable to connect to Kafka", e);
+ }
}
- private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
+ private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
if (c.failedEvents.isEmpty()) {
- c.consumer.commitOffsets();
+ c.commitOffsets();
}
c.failedEvents.clear();
- c.consumer.shutdown();
+ c.consumer.close();
}
- // Force a consumer to be initialized. There are many duplicates in
- // tests due to rebalancing - making testing tricky. In production,
- // this is less of an issue as
- // rebalancing would happen only on startup.
@VisibleForTesting
void registerThread() {
- consumerAndIter.get();
+ try {
+ consumerAndRecords.get();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ e.printStackTrace();
+ }
}
private enum TransactionType {
@@ -233,54 +294,41 @@ public class KafkaChannel extends BasicChannelSemantics {
private class KafkaTransaction extends BasicTransactionSemantics {
private TransactionType type = TransactionType.NONE;
- // For Puts
private Optional<ByteArrayOutputStream> tempOutStream = Optional
- .absent();
-
- // For put transactions, serialize the events and batch them and send it.
- private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent();
+ .absent();
+ // For put transactions, serialize the events and hold them until the commit goes is requested.
+ private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent();
// For take transactions, deserialize and hold them till commit goes through
private Optional<LinkedList<Event>> events = Optional.absent();
private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
- Optional.absent();
+ Optional.absent();
private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
- Optional.absent();
+ Optional.absent();
+ private Optional<LinkedList<Future<RecordMetadata>>> kafkaFutures =
+ Optional.absent();
+ private final String batchUUID = UUID.randomUUID().toString();
// Fine to use null for initial value, Avro will create new ones if this
// is null
private BinaryEncoder encoder = null;
private BinaryDecoder decoder = null;
- private final String batchUUID = UUID.randomUUID().toString();
private boolean eventTaken = false;
@Override
+ protected void doBegin() throws InterruptedException {
+ rebalanceFlag.set(false);
+ }
+
+ @Override
protected void doPut(Event event) throws InterruptedException {
type = TransactionType.PUT;
- if (!serializedEvents.isPresent()) {
- serializedEvents = Optional.of(new LinkedList<byte[]>());
+ if (!producerRecords.isPresent()) {
+ producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
}
-
+ String key = event.getHeaders().get(KEY_HEADER);
try {
- if (parseAsFlumeEvent) {
- if (!tempOutStream.isPresent()) {
- tempOutStream = Optional.of(new ByteArrayOutputStream());
- }
- if (!writer.isPresent()) {
- writer = Optional.of(new
- SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
- }
- tempOutStream.get().reset();
- AvroFlumeEvent e = new AvroFlumeEvent(
- toCharSeqMap(event.getHeaders()),
- ByteBuffer.wrap(event.getBody()));
- encoder = EncoderFactory.get()
- .directBinaryEncoder(tempOutStream.get(), encoder);
- writer.get().write(e, encoder);
- // Not really possible to avoid this copy :(
- serializedEvents.get().add(tempOutStream.get().toByteArray());
- } else {
- serializedEvents.get().add(event.getBody());
- }
+ producerRecords.get().add(new ProducerRecord<String, byte[]>
+ (topic.get(), key, serializeValue(event, parseAsFlumeEvent)));
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
@@ -291,53 +339,64 @@ public class KafkaChannel extends BasicChannelSemantics {
protected Event doTake() throws InterruptedException {
type = TransactionType.TAKE;
try {
- if (!(consumerAndIter.get().uuid.equals(channelUUID))) {
- LOGGER.info("UUID mismatch, creating new consumer");
- decommissionConsumerAndIterator(consumerAndIter.get());
- consumerAndIter.remove();
+ if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
+ logger.info("UUID mismatch, creating new consumer");
+ decommissionConsumerAndRecords(consumerAndRecords.get());
+ consumerAndRecords.remove();
}
} catch (Exception ex) {
- LOGGER.warn("Error while shutting down consumer", ex);
+ logger.warn("Error while shutting down consumer", ex);
}
if (!events.isPresent()) {
events = Optional.of(new LinkedList<Event>());
}
Event e;
- if (!consumerAndIter.get().failedEvents.isEmpty()) {
- e = consumerAndIter.get().failedEvents.removeFirst();
+ // Give the channel a chance to commit if there has been a rebalance
+ if (rebalanceFlag.get()) {
+ logger.debug("Returning null event after Consumer rebalance.");
+ return null;
+ }
+ if (!consumerAndRecords.get().failedEvents.isEmpty()) {
+ e = consumerAndRecords.get().failedEvents.removeFirst();
} else {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString());
+ }
+
try {
- ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
long startTime = System.nanoTime();
- it.hasNext();
- long endTime = System.nanoTime();
- counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
- if (parseAsFlumeEvent) {
- ByteArrayInputStream in =
- new ByteArrayInputStream(it.next().message());
- decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
- if (!reader.isPresent()) {
- reader = Optional.of(
- new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
- }
- AvroFlumeEvent event = reader.get().read(null, decoder);
- e = EventBuilder.withBody(event.getBody().array(),
- toStringMap(event.getHeaders()));
- } else {
- e = EventBuilder.withBody(it.next().message(),
- Collections.EMPTY_MAP);
+ if (!consumerAndRecords.get().recordIterator.hasNext()) {
+ consumerAndRecords.get().poll();
}
+ if (consumerAndRecords.get().recordIterator.hasNext()) {
+ ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
+ e = deserializeValue(record.value(), parseAsFlumeEvent);
+ TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
+ consumerAndRecords.get().offsets.put(tp, oam);
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString());
+ }
- } catch (ConsumerTimeoutException ex) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Timed out while waiting for data to come from Kafka",
- ex);
+ //Add the key to the header
+ e.getHeaders().put(KEY_HEADER, record.key());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
+ }
+
+ long endTime = System.nanoTime();
+ counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
+ } else {
+ return null;
}
- return null;
} catch (Exception ex) {
- LOGGER.warn("Error while getting events from Kafka", ex);
+ logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " +
+ "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex);
throw new ChannelException("Error while getting events from Kafka",
- ex);
+ ex);
}
}
eventTaken = true;
@@ -351,32 +410,41 @@ public class KafkaChannel extends BasicChannelSemantics {
return;
}
if (type.equals(TransactionType.PUT)) {
+ if (!kafkaFutures.isPresent()) {
+ kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
+ }
try {
- List<KeyedMessage<String, byte[]>> messages = new
- ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get()
- .size());
- for (byte[] event : serializedEvents.get()) {
- messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
- batchUUID, event));
- }
+ long batchSize = producerRecords.get().size();
long startTime = System.nanoTime();
- producer.send(messages);
+ int index = 0;
+ for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
+ index++;
+ kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
+ }
+ //prevents linger.ms from being a problem
+ producer.flush();
+
+ for (Future<RecordMetadata> future : kafkaFutures.get()) {
+ future.get();
+ }
long endTime = System.nanoTime();
- counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
- counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
- serializedEvents.get().clear();
+ counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
+ counter.addToEventPutSuccessCount(batchSize);
+ producerRecords.get().clear();
+ kafkaFutures.get().clear();
} catch (Exception ex) {
- LOGGER.warn("Sending events to Kafka failed", ex);
+ logger.warn("Sending events to Kafka failed", ex);
throw new ChannelException("Commit failed as send to Kafka failed",
- ex);
+ ex);
}
} else {
- if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+ if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
long startTime = System.nanoTime();
- consumerAndIter.get().consumer.commitOffsets();
+ consumerAndRecords.get().commitOffsets();
long endTime = System.nanoTime();
- counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000));
- }
+ counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
+ consumerAndRecords.get().printCurrentAssignment();
+ }
counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
events.get().clear();
}
@@ -388,37 +456,66 @@ public class KafkaChannel extends BasicChannelSemantics {
return;
}
if (type.equals(TransactionType.PUT)) {
- serializedEvents.get().clear();
+ producerRecords.get().clear();
+ kafkaFutures.get().clear();
} else {
counter.addToRollbackCounter(Long.valueOf(events.get().size()));
- consumerAndIter.get().failedEvents.addAll(events.get());
+ consumerAndRecords.get().failedEvents.addAll(events.get());
events.get().clear();
}
}
- }
-
- private class ConsumerAndIterator {
- final ConsumerConnector consumer;
- final ConsumerIterator<byte[], byte[]> iterator;
- final String uuid;
- final LinkedList<Event> failedEvents = new LinkedList<Event>();
+ private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOException {
+ byte[] bytes;
+ if (parseAsFlumeEvent) {
+ if (!tempOutStream.isPresent()) {
+ tempOutStream = Optional.of(new ByteArrayOutputStream());
+ }
+ if (!writer.isPresent()) {
+ writer = Optional.of(new
+ SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+ }
+ tempOutStream.get().reset();
+ AvroFlumeEvent e = new AvroFlumeEvent(
+ toCharSeqMap(event.getHeaders()),
+ ByteBuffer.wrap(event.getBody()));
+ encoder = EncoderFactory.get()
+ .directBinaryEncoder(tempOutStream.get(), encoder);
+ writer.get().write(e, encoder);
+ encoder.flush();
+ bytes = tempOutStream.get().toByteArray();
+ } else {
+ bytes = event.getBody();
+ }
+ return bytes;
+ }
- ConsumerAndIterator(ConsumerConnector consumerConnector,
- ConsumerIterator<byte[], byte[]> iterator, String uuid) {
- this.consumer = consumerConnector;
- this.iterator = iterator;
- this.uuid = uuid;
+ private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException {
+ Event e;
+ if (parseAsFlumeEvent) {
+ ByteArrayInputStream in =
+ new ByteArrayInputStream(value);
+ decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+ if (!reader.isPresent()) {
+ reader = Optional.of(
+ new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
+ }
+ AvroFlumeEvent event = reader.get().read(null, decoder);
+ e = EventBuilder.withBody(event.getBody().array(),
+ toStringMap(event.getHeaders()));
+ } else {
+ e = EventBuilder.withBody(value, Collections.EMPTY_MAP);
+ }
+ return e;
}
}
/**
* Helper function to convert a map of String to a map of CharSequence.
*/
- private static Map<CharSequence, CharSequence> toCharSeqMap(
- Map<String, String> stringMap) {
+ private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
Map<CharSequence, CharSequence> charSeqMap =
- new HashMap<CharSequence, CharSequence>();
+ new HashMap<CharSequence, CharSequence>();
for (Map.Entry<String, String> entry : stringMap.entrySet()) {
charSeqMap.put(entry.getKey(), entry.getValue());
}
@@ -428,13 +525,105 @@ public class KafkaChannel extends BasicChannelSemantics {
/**
* Helper function to convert a map of CharSequence to a map of String.
*/
- private static Map<String, String> toStringMap(
- Map<CharSequence, CharSequence> charSeqMap) {
- Map<String, String> stringMap =
- new HashMap<String, String>();
+ private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
+ Map<String, String> stringMap = new HashMap<String, String>();
for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
stringMap.put(entry.getKey().toString(), entry.getValue().toString());
}
return stringMap;
}
+
+ /* Object to store our consumer */
+ private class ConsumerAndRecords {
+ final KafkaConsumer<String, byte[]> consumer;
+ final String uuid;
+ final LinkedList<Event> failedEvents = new LinkedList<Event>();
+
+ ConsumerRecords<String, byte[]> records;
+ Iterator<ConsumerRecord<String, byte[]>> recordIterator;
+ Map<TopicPartition, OffsetAndMetadata> offsets;
+
+ ConsumerAndRecords(KafkaConsumer<String, byte[]> consumer, String uuid) {
+ this.consumer = consumer;
+ this.uuid = uuid;
+ this.records = ConsumerRecords.empty();
+ this.recordIterator = records.iterator();
+ }
+
+ void poll() {
+ this.records = consumer.poll(pollTimeout);
+ this.recordIterator = records.iterator();
+ logger.trace("polling");
+ }
+
+ void commitOffsets() {
+ this.consumer.commitSync(offsets);
+ }
+
+ // This will reset the latest assigned partitions to the last committed offsets;
+
+ public void printCurrentAssignment() {
+ StringBuilder sb = new StringBuilder();
+ for (TopicPartition tp : this.consumer.assignment()) {
+ try {
+ sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset())
+ .append(",").append(this.consumer.committed(tp).metadata()).append("]");
+ if (logger.isDebugEnabled()) {
+ logger.debug(sb.toString());
+ }
+ } catch (NullPointerException npe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committed {}", tp);
+ }
+ }
+ }
+ }
+ }
+}
+
+// Throw exception if there is an error
+class ChannelCallback implements Callback {
+ private static final Logger log = LoggerFactory.getLogger(ChannelCallback.class);
+ private int index;
+ private long startTime;
+
+ public ChannelCallback(int index, long startTime) {
+ this.index = index;
+ this.startTime = startTime;
+ }
+
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ log.trace("Error sending message to Kafka due to " + exception.getMessage());
+ }
+ if (log.isDebugEnabled()) {
+ long batchElapsedTime = System.currentTimeMillis() - startTime;
+ log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" +
+ metadata.offset() + "-" + batchElapsedTime);
+ }
+ }
+}
+
+class ChannelRebalanceListener implements ConsumerRebalanceListener {
+ private static final Logger log = LoggerFactory.getLogger(ChannelRebalanceListener.class);
+ private AtomicBoolean rebalanceFlag;
+
+ public ChannelRebalanceListener(AtomicBoolean rebalanceFlag) {
+ this.rebalanceFlag = rebalanceFlag;
+ }
+
+ // Set a flag that a rebalance has occurred. Then we can commit the currently written transactions
+ // on the next doTake() pass.
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ for (TopicPartition partition : partitions) {
+ log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+ rebalanceFlag.set(true);
+ }
+ }
+
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ for (TopicPartition partition : partitions) {
+ log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 9a342ef..faf46b6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -18,27 +18,45 @@
*/
package org.apache.flume.channel.kafka;
+import org.apache.kafka.clients.CommonClientConfigs;
+
public class KafkaChannelConfiguration {
public static final String KAFKA_PREFIX = "kafka.";
+ public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+ public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
+ public static final String DEFAULT_ACKS = "all";
+ public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+ public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+ public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
+ public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String DEFAULT_TOPIC = "flume-channel";
+ public static final String DEFAULT_GROUP_ID = "flume";
+ public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
+ public static final long DEFAULT_POLL_TIMEOUT = 500;
+
+ public static final String KEY_HEADER = "key";
+
+ public static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
+
+ public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
+ public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
+
+ /*** Old Configuration Parameters ****/
public static final String BROKER_LIST_KEY = "metadata.broker.list";
public static final String REQUIRED_ACKS_KEY = "request.required.acks";
public static final String BROKER_LIST_FLUME_KEY = "brokerList";
- public static final String TOPIC = "topic";
- public static final String GROUP_ID = "group.id";
+ //public static final String TOPIC = "topic";
public static final String GROUP_ID_FLUME = "groupId";
public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
- public static final String DEFAULT_GROUP_ID = "flume";
- public static final String DEFAULT_TOPIC = "flume-channel";
public static final String TIMEOUT = "timeout";
public static final String DEFAULT_TIMEOUT = "100";
public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
- public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
- public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
-
public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 319e779..637428d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -19,13 +19,8 @@
package org.apache.flume.channel.kafka;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -33,17 +28,26 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.*;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
public class TestKafkaChannel {
+ private final static Logger LOGGER =
+ LoggerFactory.getLogger(TestKafkaChannel.class);
+
private static TestUtil testUtil = TestUtil.getInstance();
private String topic = null;
private final Set<String> usedTopics = new HashSet<String>();
@@ -78,11 +82,52 @@ public class TestKafkaChannel {
testUtil.tearDown();
}
+ //Make sure the props are picked up correctly.
+ @Test
+ public void testProps() throws Exception {
+ Context context = new Context();
+ context.put("kafka.producer.some-parameter", "1");
+ context.put("kafka.consumer.another-parameter", "1");
+ context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+ context.put(TOPIC_CONFIG, topic);
+
+ final KafkaChannel channel = new KafkaChannel();
+ Configurables.configure(channel, context);
+
+ Properties consumerProps = channel.getConsumerProps();
+ Properties producerProps = channel.getProducerProps();
+
+ Assert.assertEquals(producerProps.getProperty("some-parameter"), "1");
+ Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
+ }
+
+ @Test
+ public void testOldConfig() throws Exception {
+ Context context = new Context();
+ context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl());
+ context.put(GROUP_ID_FLUME,"flume-something");
+ context.put(READ_SMALLEST_OFFSET,"true");
+ context.put("topic",topic);
+
+ final KafkaChannel channel = new KafkaChannel();
+ Configurables.configure(channel, context);
+
+ Properties consumerProps = channel.getConsumerProps();
+ Properties producerProps = channel.getProducerProps();
+
+ Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl());
+ Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something");
+ Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+
+ }
+
+
@Test
public void testSuccess() throws Exception {
doTestSuccessRollback(false, false);
}
+
@Test
public void testSuccessInterleave() throws Exception {
doTestSuccessRollback(false, true);
@@ -99,7 +144,7 @@ public class TestKafkaChannel {
}
private void doTestSuccessRollback(final boolean rollback,
- final boolean interleave) throws Exception {
+ final boolean interleave) throws Exception {
final KafkaChannel channel = startChannel(true);
writeAndVerify(rollback, channel, interleave);
channel.stop();
@@ -122,82 +167,89 @@ public class TestKafkaChannel {
}
@Test
- public void testNoParsingAsFlumeAgent() throws Exception {
+ public void testParseAsFlumeEventFalse() throws Exception {
+ doParseAsFlumeEventFalse(false);
+ }
+
+ @Test
+ public void testParseAsFlumeEventFalseCheckHeader() throws Exception {
+ doParseAsFlumeEventFalse(true);
+ }
+
+ @Test
+ public void testParseAsFlumeEventFalseAsSource() throws Exception {
+ doParseAsFlumeEventFalseAsSource(false);
+ }
+
+ @Test
+ public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception {
+ doParseAsFlumeEventFalseAsSource(true);
+ }
+
+ private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
final KafkaChannel channel = startChannel(false);
- Producer<String, byte[]> producer = new Producer<String, byte[]>(
- new ProducerConfig(channel.getKafkaConf()));
- List<KeyedMessage<String, byte[]>> original = Lists.newArrayList();
+ Properties props = channel.getProducerProps();
+ KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
+
for (int i = 0; i < 50; i++) {
- KeyedMessage<String, byte[]> data = new KeyedMessage<String,
- byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6),
- String.valueOf(i).getBytes());
- original.add(data);
+ ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes());
+ producer.send(data).get();
}
- producer.send(original);
ExecutorCompletionService<Void> submitterSvc = new
- ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+ ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
List<Event> events = pullEvents(channel, submitterSvc,
- 50, false, false);
+ 50, false, false);
wait(submitterSvc, 5);
- Set<Integer> finals = Sets.newHashSet();
+ Map<Integer, String> finals = new HashMap<Integer, String>();
for (int i = 0; i < 50; i++) {
- finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+ finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
- Assert.assertTrue(finals.contains(i));
+ Assert.assertTrue(finals.keySet().contains(i));
+ if (checkHeaders) {
+ Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+ }
finals.remove(i);
}
Assert.assertTrue(finals.isEmpty());
channel.stop();
}
- @Test
- public void testTimeoutConfig() throws Exception {
- Context context = prepareDefaultContext(true);
- KafkaChannel channel = new KafkaChannel();
- Configurables.configure(channel, context);
- Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
- .equals(DEFAULT_TIMEOUT));
-
- String timeout = "1000";
- context.put("kafka."+CONSUMER_TIMEOUT, timeout);
- channel = new KafkaChannel();
- Configurables.configure(channel, context);
- Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
- .equals(timeout));
- }
-
/**
* Like the previous test but here we write to the channel like a Flume source would do
* to verify that the events are written as text and not as an Avro object
*
* @throws Exception
*/
- @Test
- public void testWritingToNoParsingAsFlumeAgent() throws Exception {
+ public void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception {
final KafkaChannel channel = startChannel(false);
List<String> msgs = new ArrayList<String>();
- for (int i = 0; i < 50; i++){
+ Map<String, String> headers = new HashMap<String, String>();
+ for (int i = 0; i < 50; i++) {
msgs.add(String.valueOf(i));
}
Transaction tx = channel.getTransaction();
tx.begin();
- for (int i = 0; i < msgs.size(); i++){
- channel.put(EventBuilder.withBody(msgs.get(i).getBytes()));
+ for (int i = 0; i < msgs.size(); i++) {
+ headers.put(KEY_HEADER, String.valueOf(i) + "-header");
+ channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
}
tx.commit();
ExecutorCompletionService<Void> submitterSvc = new
ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
List<Event> events = pullEvents(channel, submitterSvc,
- 50, false, false);
+ 50, false, false);
wait(submitterSvc, 5);
- Set<Integer> finals = Sets.newHashSet();
+ Map<Integer, String> finals = new HashMap<Integer, String>();
for (int i = 0; i < 50; i++) {
- finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+ finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
- Assert.assertTrue(finals.contains(i));
+ Assert.assertTrue(finals.keySet().contains(i));
+ if (checkHeaders) {
+ Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+ }
finals.remove(i);
}
Assert.assertTrue(finals.isEmpty());
@@ -216,12 +268,12 @@ public class TestKafkaChannel {
* @throws Exception
*/
private void doTestStopAndStart(boolean rollback,
- boolean retryAfterRollback) throws Exception {
+ boolean retryAfterRollback) throws Exception {
final KafkaChannel channel = startChannel(true);
ExecutorService underlying = Executors
- .newCachedThreadPool();
+ .newCachedThreadPool();
ExecutorCompletionService<Void> submitterSvc =
- new ExecutorCompletionService<Void>(underlying);
+ new ExecutorCompletionService<Void>(underlying);
final List<List<Event>> events = createBaseList();
putEvents(channel, events, submitterSvc);
int completed = 0;
@@ -233,14 +285,14 @@ public class TestKafkaChannel {
total = 40;
}
final List<Event> eventsPulled =
- pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
+ pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
wait(submitterSvc, 5);
channel2.stop();
if (!retryAfterRollback && rollback) {
final KafkaChannel channel3 = startChannel(true);
int expectedRemaining = 50 - eventsPulled.size();
final List<Event> eventsPulled2 =
- pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
+ pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
wait(submitterSvc, 5);
Assert.assertEquals(expectedRemaining, eventsPulled2.size());
eventsPulled.addAll(eventsPulled2);
@@ -259,18 +311,18 @@ public class TestKafkaChannel {
}
private void writeAndVerify(final boolean testRollbacks,
- final KafkaChannel channel) throws Exception {
+ final KafkaChannel channel) throws Exception {
writeAndVerify(testRollbacks, channel, false);
}
private void writeAndVerify(final boolean testRollbacks,
- final KafkaChannel channel, final boolean interleave) throws Exception {
+ final KafkaChannel channel, final boolean interleave) throws Exception {
final List<List<Event>> events = createBaseList();
ExecutorCompletionService<Void> submitterSvc =
- new ExecutorCompletionService<Void>(Executors
- .newCachedThreadPool());
+ new ExecutorCompletionService<Void>(Executors
+ .newCachedThreadPool());
putEvents(channel, events, submitterSvc);
@@ -279,11 +331,11 @@ public class TestKafkaChannel {
}
ExecutorCompletionService<Void> submitterSvc2 =
- new ExecutorCompletionService<Void>(Executors
- .newCachedThreadPool());
+ new ExecutorCompletionService<Void>(Executors
+ .newCachedThreadPool());
final List<Event> eventsPulled =
- pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+ pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
if (!interleave) {
wait(submitterSvc, 5);
@@ -301,7 +353,7 @@ public class TestKafkaChannel {
for (int j = 0; j < 10; j++) {
Map<String, String> hdrs = new HashMap<String, String>();
String v = (String.valueOf(i) + " - " + String
- .valueOf(j));
+ .valueOf(j));
hdrs.put("header", v);
eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
}
@@ -310,7 +362,7 @@ public class TestKafkaChannel {
}
private void putEvents(final KafkaChannel channel, final List<List<Event>>
- events, ExecutorCompletionService<Void> submitterSvc) {
+ events, ExecutorCompletionService<Void> submitterSvc) {
for (int i = 0; i < 5; i++) {
final int index = i;
submitterSvc.submit(new Callable<Void>() {
@@ -334,10 +386,10 @@ public class TestKafkaChannel {
}
private List<Event> pullEvents(final KafkaChannel channel,
- ExecutorCompletionService<Void> submitterSvc, final int total,
- final boolean testRollbacks, final boolean retryAfterRollback) {
+ ExecutorCompletionService<Void> submitterSvc, final int total,
+ final boolean testRollbacks, final boolean retryAfterRollback) {
final List<Event> eventsPulled = Collections.synchronizedList(new
- ArrayList<Event>(50));
+ ArrayList<Event>(50));
final CyclicBarrier barrier = new CyclicBarrier(5);
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger rolledBackCount = new AtomicInteger(0);
@@ -366,9 +418,9 @@ public class TestKafkaChannel {
eventsLocal.add(e);
} else {
if (testRollbacks &&
- index == 4 &&
- (!rolledBack.get()) &&
- startedGettingEvents.get()) {
+ index == 4 &&
+ (!rolledBack.get()) &&
+ startedGettingEvents.get()) {
tx.rollback();
tx.close();
tx = null;
@@ -407,7 +459,7 @@ public class TestKafkaChannel {
}
private void wait(ExecutorCompletionService<Void> submitterSvc, int max)
- throws Exception {
+ throws Exception {
int completed = 0;
while (completed < max) {
submitterSvc.take();
@@ -420,8 +472,7 @@ public class TestKafkaChannel {
Assert.assertEquals(50, eventsPulled.size());
Set<String> eventStrings = new HashSet<String>();
for (Event e : eventsPulled) {
- Assert
- .assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
+ Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
eventStrings.add(e.getHeaders().get("header"));
}
for (int i = 0; i < 5; i++) {
@@ -437,14 +488,10 @@ public class TestKafkaChannel {
private Context prepareDefaultContext(boolean parseAsFlume) {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
- context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY,
- testUtil.getKafkaServerUrl());
- context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY,
- testUtil.getZkUrl());
- context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT,
- String.valueOf(parseAsFlume));
- context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true");
- context.put(KafkaChannelConfiguration.TOPIC, topic);
+ context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+ context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
+ context.put(TOPIC_CONFIG, topic);
+
return context;
}
@@ -452,22 +499,18 @@ public class TestKafkaChannel {
int numPartitions = 5;
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
- ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
- sessionTimeoutMs, connectionTimeoutMs,
- ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
int replicationFactor = 1;
Properties topicConfig = new Properties();
- AdminUtils.createTopic(zkClient, topicName, numPartitions,
- replicationFactor, topicConfig);
+ AdminUtils.createTopic(zkUtils, topicName, numPartitions,
+ replicationFactor, topicConfig);
}
public static void deleteTopic(String topicName) {
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
- ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
- sessionTimeoutMs, connectionTimeoutMs,
- ZKStringSerializer$.MODULE$);
- AdminUtils.deleteTopic(zkClient, topicName);
+ ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+ AdminUtils.deleteTopic(zkUtils, topicName);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index c10c89d..216bfd8 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -38,7 +38,7 @@ port=9092
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
-num.network.threads=2
+num.network.threads=4
# The number of threads doing disk I/O
num.io.threads=8
http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 15f27c3..5149ab5 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2679,36 +2679,60 @@ The Kafka channel can be used for multiple scenarios:
* With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
* With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr
+
+This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.
+
+The configuration parameters are organized as such:
+1) Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =
+2) Configuration values related to Kafka or how the Channel operates are prefixed with "kafka.", (this are analgous to CommonClient Configs)eg: a1.channels.k1.kafka.topica1.channels.k1.kafka.bootstrap.serversThis is not dissimilar to how the hdfs sink operates
+3) Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer
+4) Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks
+
+This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message
+is logged on startup when they are present in the configuration file.
+
Required properties are in **bold**.
-====================== ========================== ===============================================================================================================
-Property Name Default Description
-====================== ========================== ===============================================================================================================
-**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
-**brokerList** -- List of brokers in the Kafka cluster used by the channel
- This can be a partial list of brokers, but we recommend at least two for HA.
- The format is comma separated list of hostname:port
-**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster
- The format is comma separated list of hostname:port. If chroot is used, it is added once at the end.
- For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka
-topic flume-channel Kafka topic which the channel will use
-groupId flume Consumer group ID the channel uses to register with Kafka.
- Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
- Note that having non-channel consumers with the same ID can lead to data loss.
-parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel.
- This should be true if Flume source is writing to the channel
- And false if other producers are writing into the topic that the channel is using
- Flume source messages to Kafka can be parsed outside of Flume by using
- org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
-readSmallestOffset false When set to true, the channel will read all data in the topic, starting from the oldest event
- when false, it will read only events written after the channel started
- When "parseAsFlumeEvent" is true, this will be false. Flume source will start prior to the sinks and this
- guarantees that events sent by source before sinks start will not be lost.
-Other Kafka Properties -- These properties are used to configure the Kafka Producer and Consumer used by the channel.
- Any property supported by Kafka can be used.
- The only requirement is to prepend the property name with the prefix ``kafka.``.
- For example: kafka.producer.type
-====================== ========================== ===============================================================================================================
+================================ ========================== ===============================================================================================================
+Property Name Default Description
+================================ ========================== ===============================================================================================================
+**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
+**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the channel
+ This can be a partial list of brokers, but we recommend at least two for HA.
+ The format is comma separated list of hostname:port
+kafka.topic flume-channel Kafka topic which the channel will use
+kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka.
+ Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
+ Note that having non-channel consumers with the same ID can lead to data loss.
+
+parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel.
+ This should be true if Flume source is writing to the channel and false if other producers are
+ writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
+ org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
+pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
+ https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
+kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
+ (e.g. because that data has been deleted):
+ earliest: automatically reset the offset to the earliest offset
+ latest: automatically reset the offset to the latest offset
+ none: throw exception to the consumer if no previous offset is found for the consumer\'s group
+ anything else: throw exception to the consumer.
+================================ ========================== ===============================================================================================================
+
+Deprecated Properties
+
+================================ ========================== ===============================================================================================================
+Property Name Default Description
+================================ ========================== ===============================================================================================================
+brokerList -- List of brokers in the Kafka cluster used by the channel
+ This can be a partial list of brokers, but we recommend at least two for HA.
+ The format is comma separated list of hostname:port
+topic flume-channel Use kafka.topic
+groupId flume Use kafka.consumer.group.id
+readSmallestOffset false Use kafka.consumer.auto.offset.reset
+
+================================ ========================== ===============================================================================================================
.. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up
@@ -2716,12 +2740,12 @@ Example for agent named a1:
.. code-block:: properties
- a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+ a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000
- a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
- a1.channels.channel1.topic=channel1
- a1.channels.channel1.zookeeperConnect=kafka-1:2181
+ a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
+ a1.channels.channel1.kafka.topic = channel1
+ a1.channels.channel1.kafka.consumer.group.id = flume-consumer
File Channel
~~~~~~~~~~~~