You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jo...@apache.org on 2015/10/22 00:22:50 UTC

flume git commit: FLUME-2632: High CPU on KafkaSink

Repository: flume
Updated Branches:
  refs/heads/trunk 67ed62aa1 -> d6bf08b54


FLUME-2632: High CPU on KafkaSink

(Ashish Paliwal via Johny Rufus)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d6bf08b5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d6bf08b5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d6bf08b5

Branch: refs/heads/trunk
Commit: d6bf08b54e467a6bdc6a5fc0edd41c51200e9da1
Parents: 67ed62a
Author: Johny Rufus <jo...@apache.org>
Authored: Wed Oct 21 15:18:42 2015 -0700
Committer: Johny Rufus <jo...@apache.org>
Committed: Wed Oct 21 15:18:42 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java   | 6 ++++++
 .../test/java/org/apache/flume/sink/kafka/TestKafkaSink.java   | 2 +-
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index eada17c..38b854b 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -98,6 +98,12 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
         if (event == null) {
           // no events available in channel
+          if(processedEvents == 0) {
+            result = Status.BACKOFF;
+            counter.incrementBatchEmptyCount();
+          } else {
+            counter.incrementBatchUnderflowCount();
+          }
           break;
         }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 80f764f..72117b1 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -168,7 +168,7 @@ public class TestKafkaSink {
     kafkaSink.start();
 
     Sink.Status status = kafkaSink.process();
-    if (status == Sink.Status.BACKOFF) {
+    if (status != Sink.Status.BACKOFF) {
       fail("Error Occurred");
     }
     assertNull(