You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jo...@apache.org on 2015/10/22 00:22:50 UTC
flume git commit: FLUME-2632: High CPU on KafkaSink
Repository: flume
Updated Branches:
refs/heads/trunk 67ed62aa1 -> d6bf08b54
FLUME-2632: High CPU on KafkaSink
(Ashish Paliwal via Johny Rufus)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d6bf08b5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d6bf08b5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d6bf08b5
Branch: refs/heads/trunk
Commit: d6bf08b54e467a6bdc6a5fc0edd41c51200e9da1
Parents: 67ed62a
Author: Johny Rufus <jo...@apache.org>
Authored: Wed Oct 21 15:18:42 2015 -0700
Committer: Johny Rufus <jo...@apache.org>
Committed: Wed Oct 21 15:18:42 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java | 6 ++++++
.../test/java/org/apache/flume/sink/kafka/TestKafkaSink.java | 2 +-
2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index eada17c..38b854b 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -98,6 +98,12 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (event == null) {
// no events available in channel
+ if(processedEvents == 0) {
+ result = Status.BACKOFF;
+ counter.incrementBatchEmptyCount();
+ } else {
+ counter.incrementBatchUnderflowCount();
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/d6bf08b5/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 80f764f..72117b1 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -168,7 +168,7 @@ public class TestKafkaSink {
kafkaSink.start();
Sink.Status status = kafkaSink.process();
- if (status == Sink.Status.BACKOFF) {
+ if (status != Sink.Status.BACKOFF) {
fail("Error Occurred");
}
assertNull(