You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by be...@apache.org on 2016/12/06 07:31:14 UTC
flume git commit: FLUME-3027. Change Kafka Channel to clear offsets
map after commit
Repository: flume
Updated Branches:
refs/heads/trunk ed9f6ff6f -> fa1ee05af
FLUME-3027. Change Kafka Channel to clear offsets map after commit
This change adds a call to clear the offsets map after a commit so as to avoid repeatedly committing already-committed offsets. Also updates various debug and trace log messages/calls to help with troubleshooting.
This closes #92
Reviewers: Attila Simon, Bessenyei Bal�zs Don�t
(Jeff Holoman via Bessenyei Bal�zs Don�t)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fa1ee05a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fa1ee05a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fa1ee05a
Branch: refs/heads/trunk
Commit: fa1ee05af38bcf08ed18ff36d4284e68836a9054
Parents: ed9f6ff
Author: Jeff Holoman <je...@gmail.com>
Authored: Wed Nov 23 10:52:06 2016 -0500
Committer: Bessenyei Bal�zs Don�t <be...@apache.org>
Committed: Tue Dec 6 07:28:28 2016 +0000
----------------------------------------------------------------------
.../flume/channel/kafka/KafkaChannel.java | 129 ++++++++++++-------
1 file changed, 80 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/fa1ee05a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index cc7bb48..6684bea 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,9 +103,9 @@ public class KafkaChannel extends BasicChannelSemantics {
private Integer staticPartitionId;
private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
- //used to indicate if a rebalance has occurred during the current transaction
+ // used to indicate if a rebalance has occurred during the current transaction
AtomicBoolean rebalanceFlag = new AtomicBoolean();
- //This isn't a Kafka property per se, but we allow it to be configurable
+ // This isn't a Kafka property per se, but we allow it to be configurable
private long pollTimeout = DEFAULT_POLL_TIMEOUT;
@@ -154,8 +155,7 @@ public class KafkaChannel extends BasicChannelSemantics {
producer.close();
counter.stop();
super.stop();
- logger.info("Kafka channel {} stopped. Metrics: {}", getName(),
- counter);
+ logger.info("Kafka channel {} stopped.", getName());
}
@Override
@@ -166,7 +166,7 @@ public class KafkaChannel extends BasicChannelSemantics {
@Override
public void configure(Context ctx) {
- //Can remove in the next release
+ // Can remove in the next release
translateOldProps(ctx);
topicStr = ctx.getString(TOPIC_CONFIG);
@@ -217,7 +217,7 @@ public class KafkaChannel extends BasicChannelSemantics {
logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
}
- //Broker List
+ // Broker List
// If there is no value we need to check and set the old param and log a warning message
if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
@@ -230,7 +230,7 @@ public class KafkaChannel extends BasicChannelSemantics {
}
}
- //GroupId
+ // GroupId
// If there is an old Group Id set, then use that if no groupId is set.
if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
String oldGroupId = ctx.getString(GROUP_ID_FLUME);
@@ -265,7 +265,7 @@ public class KafkaChannel extends BasicChannelSemantics {
producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
- //Defaults overridden based on config
+ // Defaults overridden based on config
producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
}
@@ -279,9 +279,9 @@ public class KafkaChannel extends BasicChannelSemantics {
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
- //Defaults overridden based on config
+ // Defaults overridden based on config
consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
- //These always take precedence over config
+ // These always take precedence over config
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -313,8 +313,7 @@ public class KafkaChannel extends BasicChannelSemantics {
try {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
if (!kafkaOffsets.isEmpty()) {
- logger.info("Found Kafka offsets for topic " + topicStr +
- ". Will not migrate from zookeeper");
+ logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
logger.debug("Offsets found: {}", kafkaOffsets);
return;
}
@@ -373,6 +372,7 @@ public class KafkaChannel extends BasicChannelSemantics {
}
private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
+ c.consumer.wakeup();
c.consumer.close();
}
@@ -434,7 +434,7 @@ public class KafkaChannel extends BasicChannelSemantics {
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
- //Allow a specified header to override a static ID
+ // Allow a specified header to override a static ID
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
@@ -460,6 +460,7 @@ public class KafkaChannel extends BasicChannelSemantics {
@SuppressWarnings("unchecked")
@Override
protected Event doTake() throws InterruptedException {
+ logger.trace("Starting event take");
type = TransactionType.TAKE;
try {
if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
@@ -482,11 +483,10 @@ public class KafkaChannel extends BasicChannelSemantics {
if (!consumerAndRecords.get().failedEvents.isEmpty()) {
e = consumerAndRecords.get().failedEvents.removeFirst();
} else {
-
- if (logger.isDebugEnabled()) {
- logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString());
+ if ( logger.isTraceEnabled() ) {
+ logger.trace("Assignment during take: {}",
+ consumerAndRecords.get().consumer.assignment().toString());
}
-
try {
long startTime = System.nanoTime();
if (!consumerAndRecords.get().recordIterator.hasNext()) {
@@ -497,24 +497,20 @@ public class KafkaChannel extends BasicChannelSemantics {
e = deserializeValue(record.value(), parseAsFlumeEvent);
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
- consumerAndRecords.get().offsets.put(tp, oam);
-
- if (logger.isTraceEnabled()) {
- logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString());
- }
+ consumerAndRecords.get().saveOffsets(tp,oam);
//Add the key to the header
if (record.key() != null) {
e.getHeaders().put(KEY_HEADER, record.key());
}
- if (logger.isDebugEnabled()) {
- logger.debug("Processed output from partition {} offset {}",
- record.partition(), record.offset());
- }
-
long endTime = System.nanoTime();
counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} processed output from partition {} offset {}",
+ new Object[] {getName(), record.partition(), record.offset()});
+ }
} else {
return null;
}
@@ -532,6 +528,7 @@ public class KafkaChannel extends BasicChannelSemantics {
@Override
protected void doCommit() throws InterruptedException {
+ logger.trace("Starting commit");
if (type.equals(TransactionType.NONE)) {
return;
}
@@ -564,15 +561,24 @@ public class KafkaChannel extends BasicChannelSemantics {
ex);
}
} else {
+ // event taken ensures that we have collected events in this transaction
+ // before committing
if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
+ logger.trace("About to commit batch");
long startTime = System.nanoTime();
consumerAndRecords.get().commitOffsets();
long endTime = System.nanoTime();
counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
- consumerAndRecords.get().printCurrentAssignment();
+ if (logger.isDebugEnabled()) {
+ logger.debug(consumerAndRecords.get().getCommittedOffsetsString());
+ }
+ }
+
+ int takes = events.get().size();
+ if (takes > 0) {
+ counter.addToEventTakeSuccessCount(takes);
+ events.get().clear();
}
- counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
- events.get().clear();
}
}
@@ -585,7 +591,7 @@ public class KafkaChannel extends BasicChannelSemantics {
producerRecords.get().clear();
kafkaFutures.get().clear();
} else {
- counter.addToRollbackCounter(Long.valueOf(events.get().size()));
+ counter.addToRollbackCounter(events.get().size());
consumerAndRecords.get().failedEvents.addAll(events.get());
events.get().clear();
}
@@ -676,34 +682,59 @@ public class KafkaChannel extends BasicChannelSemantics {
this.recordIterator = records.iterator();
}
- void poll() {
- this.records = consumer.poll(pollTimeout);
- this.recordIterator = records.iterator();
- logger.trace("polling");
+ private void poll() {
+ logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName());
+ try {
+ records = consumer.poll(pollTimeout);
+ recordIterator = records.iterator();
+ logger.debug("{} returned {} records from last poll", getName(), records.count());
+ } catch (WakeupException e) {
+ logger.trace("Consumer woken up for channel {}.", getName());
+ }
}
- void commitOffsets() {
- this.consumer.commitSync(offsets);
+ private void commitOffsets() {
+ try {
+ consumer.commitSync(offsets);
+ } catch (Exception e) {
+ logger.info("Error committing offsets.", e);
+ } finally {
+ logger.trace("About to clear offsets map.");
+ offsets.clear();
+ }
}
- // This will reset the latest assigned partitions to the last committed offsets;
+ private String getOffsetMapString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getName()).append(" current offsets map: ");
+ for (TopicPartition tp : offsets.keySet()) {
+ sb.append("p").append(tp.partition()).append("-")
+ .append(offsets.get(tp).offset()).append(" ");
+ }
+ return sb.toString();
+ }
- public void printCurrentAssignment() {
+ // This prints the current committed offsets when debug is enabled
+ private String getCommittedOffsetsString() {
StringBuilder sb = new StringBuilder();
- for (TopicPartition tp : this.consumer.assignment()) {
+ sb.append(getName()).append(" committed: ");
+ for (TopicPartition tp : consumer.assignment()) {
try {
- sb.append("Committed: [").append(tp).append(",")
- .append(this.consumer.committed(tp).offset())
- .append(",").append(this.consumer.committed(tp).metadata()).append("]");
- if (logger.isDebugEnabled()) {
- logger.debug(sb.toString());
- }
+ sb.append("[").append(tp).append(",")
+ .append(consumer.committed(tp).offset())
+ .append("] ");
} catch (NullPointerException npe) {
- if (logger.isDebugEnabled()) {
- logger.debug("Committed {}", tp);
- }
+ logger.debug("Committed {}", tp);
}
}
+ return sb.toString();
+ }
+
+ private void saveOffsets(TopicPartition tp, OffsetAndMetadata oam) {
+ offsets.put(tp,oam);
+ if (logger.isTraceEnabled()) {
+ logger.trace(getOffsetMapString());
+ }
}
}
}