You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by be...@apache.org on 2016/12/06 07:31:14 UTC

flume git commit: FLUME-3027. Change Kafka Channel to clear offsets map after commit

Repository: flume
Updated Branches:
  refs/heads/trunk ed9f6ff6f -> fa1ee05af


FLUME-3027. Change Kafka Channel to clear offsets map after commit

This change adds a call to clear the offsets map after a commit so as to avoid repeatedly committing already-committed offsets. Also updates various debug and trace log messages/calls to help with troubleshooting.

This closes #92

Reviewers: Attila Simon, Bessenyei Bal�zs Don�t

(Jeff Holoman via Bessenyei Bal�zs Don�t)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fa1ee05a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fa1ee05a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fa1ee05a

Branch: refs/heads/trunk
Commit: fa1ee05af38bcf08ed18ff36d4284e68836a9054
Parents: ed9f6ff
Author: Jeff Holoman <je...@gmail.com>
Authored: Wed Nov 23 10:52:06 2016 -0500
Committer: Bessenyei Bal�zs Don�t <be...@apache.org>
Committed: Tue Dec 6 07:28:28 2016 +0000

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       | 129 ++++++++++++-------
 1 file changed, 80 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/fa1ee05a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index cc7bb48..6684bea 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,9 +103,9 @@ public class KafkaChannel extends BasicChannelSemantics {
   private Integer staticPartitionId;
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
-  //used to indicate if a rebalance has occurred during the current transaction
+  // used to indicate if a rebalance has occurred during the current transaction
   AtomicBoolean rebalanceFlag = new AtomicBoolean();
-  //This isn't a Kafka property per se, but we allow it to be configurable
+  // This isn't a Kafka property per se, but we allow it to be configurable
   private long pollTimeout = DEFAULT_POLL_TIMEOUT;
 
 
@@ -154,8 +155,7 @@ public class KafkaChannel extends BasicChannelSemantics {
     producer.close();
     counter.stop();
     super.stop();
-    logger.info("Kafka channel {} stopped. Metrics: {}", getName(),
-            counter);
+    logger.info("Kafka channel {} stopped.", getName());
   }
 
   @Override
@@ -166,7 +166,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   @Override
   public void configure(Context ctx) {
 
-    //Can remove in the next release
+    // Can remove in the next release
     translateOldProps(ctx);
 
     topicStr = ctx.getString(TOPIC_CONFIG);
@@ -217,7 +217,7 @@ public class KafkaChannel extends BasicChannelSemantics {
       logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
     }
 
-    //Broker List
+    // Broker List
     // If there is no value we need to check and set the old param and log a warning message
     if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
       String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
@@ -230,7 +230,7 @@ public class KafkaChannel extends BasicChannelSemantics {
       }
     }
 
-    //GroupId
+    // GroupId
     // If there is an old Group Id set, then use that if no groupId is set.
     if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
       String oldGroupId = ctx.getString(GROUP_ID_FLUME);
@@ -265,7 +265,7 @@ public class KafkaChannel extends BasicChannelSemantics {
     producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
-    //Defaults overridden based on config
+    // Defaults overridden based on config
     producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
   }
@@ -279,9 +279,9 @@ public class KafkaChannel extends BasicChannelSemantics {
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
-    //Defaults overridden based on config
+    // Defaults overridden based on config
     consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
-    //These always take precedence over config
+    // These always take precedence over config
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -313,8 +313,7 @@ public class KafkaChannel extends BasicChannelSemantics {
     try {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
       if (!kafkaOffsets.isEmpty()) {
-        logger.info("Found Kafka offsets for topic " + topicStr +
-            ". Will not migrate from zookeeper");
+        logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
         logger.debug("Offsets found: {}", kafkaOffsets);
         return;
       }
@@ -373,6 +372,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   }
 
   private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
+    c.consumer.wakeup();
     c.consumer.close();
   }
 
@@ -434,7 +434,7 @@ public class KafkaChannel extends BasicChannelSemantics {
         if (staticPartitionId != null) {
           partitionId = staticPartitionId;
         }
-        //Allow a specified header to override a static ID
+        // Allow a specified header to override a static ID
         if (partitionHeader != null) {
           String headerVal = event.getHeaders().get(partitionHeader);
           if (headerVal != null) {
@@ -460,6 +460,7 @@ public class KafkaChannel extends BasicChannelSemantics {
     @SuppressWarnings("unchecked")
     @Override
     protected Event doTake() throws InterruptedException {
+      logger.trace("Starting event take");
       type = TransactionType.TAKE;
       try {
         if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
@@ -482,11 +483,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       if (!consumerAndRecords.get().failedEvents.isEmpty()) {
         e = consumerAndRecords.get().failedEvents.removeFirst();
       } else {
-
-        if (logger.isDebugEnabled()) {
-          logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString());
+        if ( logger.isTraceEnabled() ) {
+          logger.trace("Assignment during take: {}",
+              consumerAndRecords.get().consumer.assignment().toString());
         }
-
         try {
           long startTime = System.nanoTime();
           if (!consumerAndRecords.get().recordIterator.hasNext()) {
@@ -497,24 +497,20 @@ public class KafkaChannel extends BasicChannelSemantics {
             e = deserializeValue(record.value(), parseAsFlumeEvent);
             TopicPartition tp = new TopicPartition(record.topic(), record.partition());
             OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
-            consumerAndRecords.get().offsets.put(tp, oam);
-
-            if (logger.isTraceEnabled()) {
-              logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString());
-            }
+            consumerAndRecords.get().saveOffsets(tp,oam);
 
             //Add the key to the header
             if (record.key() != null) {
               e.getHeaders().put(KEY_HEADER, record.key());
             }
 
-            if (logger.isDebugEnabled()) {
-              logger.debug("Processed output from partition {} offset {}",
-                           record.partition(), record.offset());
-            }
-
             long endTime = System.nanoTime();
             counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("{} processed output from partition {} offset {}",
+                  new Object[] {getName(), record.partition(), record.offset()});
+            }
           } else {
             return null;
           }
@@ -532,6 +528,7 @@ public class KafkaChannel extends BasicChannelSemantics {
 
     @Override
     protected void doCommit() throws InterruptedException {
+      logger.trace("Starting commit");
       if (type.equals(TransactionType.NONE)) {
         return;
       }
@@ -564,15 +561,24 @@ public class KafkaChannel extends BasicChannelSemantics {
                   ex);
         }
       } else {
+        // event taken ensures that we have collected events in this transaction
+        // before committing
         if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
+          logger.trace("About to commit batch");
           long startTime = System.nanoTime();
           consumerAndRecords.get().commitOffsets();
           long endTime = System.nanoTime();
           counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
-          consumerAndRecords.get().printCurrentAssignment();
+          if (logger.isDebugEnabled()) {
+            logger.debug(consumerAndRecords.get().getCommittedOffsetsString());
+          }
+        }
+
+        int takes = events.get().size();
+        if (takes > 0) {
+          counter.addToEventTakeSuccessCount(takes);
+          events.get().clear();
         }
-        counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
-        events.get().clear();
       }
     }
 
@@ -585,7 +591,7 @@ public class KafkaChannel extends BasicChannelSemantics {
         producerRecords.get().clear();
         kafkaFutures.get().clear();
       } else {
-        counter.addToRollbackCounter(Long.valueOf(events.get().size()));
+        counter.addToRollbackCounter(events.get().size());
         consumerAndRecords.get().failedEvents.addAll(events.get());
         events.get().clear();
       }
@@ -676,34 +682,59 @@ public class KafkaChannel extends BasicChannelSemantics {
       this.recordIterator = records.iterator();
     }
 
-    void poll() {
-      this.records = consumer.poll(pollTimeout);
-      this.recordIterator = records.iterator();
-      logger.trace("polling");
+    private void poll() {
+      logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName());
+      try {
+        records = consumer.poll(pollTimeout);
+        recordIterator = records.iterator();
+        logger.debug("{} returned {} records from last poll", getName(), records.count());
+      } catch (WakeupException e) {
+        logger.trace("Consumer woken up for channel {}.", getName());
+      }
     }
 
-    void commitOffsets() {
-      this.consumer.commitSync(offsets);
+    private void commitOffsets() {
+      try {
+        consumer.commitSync(offsets);
+      } catch (Exception e) {
+        logger.info("Error committing offsets.", e);
+      } finally {
+        logger.trace("About to clear offsets map.");
+        offsets.clear();
+      }
     }
 
-    // This will reset the latest assigned partitions to the last committed offsets;
+    private String getOffsetMapString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(getName()).append(" current offsets map: ");
+      for (TopicPartition tp : offsets.keySet()) {
+        sb.append("p").append(tp.partition()).append("-")
+            .append(offsets.get(tp).offset()).append(" ");
+      }
+      return sb.toString();
+    }
 
-    public void printCurrentAssignment() {
+    // This prints the current committed offsets when debug is enabled
+    private String getCommittedOffsetsString() {
       StringBuilder sb = new StringBuilder();
-      for (TopicPartition tp : this.consumer.assignment()) {
+      sb.append(getName()).append(" committed: ");
+      for (TopicPartition tp : consumer.assignment()) {
         try {
-          sb.append("Committed: [").append(tp).append(",")
-              .append(this.consumer.committed(tp).offset())
-              .append(",").append(this.consumer.committed(tp).metadata()).append("]");
-          if (logger.isDebugEnabled()) {
-            logger.debug(sb.toString());
-          }
+          sb.append("[").append(tp).append(",")
+              .append(consumer.committed(tp).offset())
+              .append("] ");
         } catch (NullPointerException npe) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Committed {}", tp);
-          }
+          logger.debug("Committed {}", tp);
         }
       }
+      return sb.toString();
+    }
+
+    private void saveOffsets(TopicPartition tp, OffsetAndMetadata oam) {
+      offsets.put(tp,oam);
+      if (logger.isTraceEnabled()) {
+        logger.trace(getOffsetMapString());
+      }
     }
   }
 }