You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jo...@apache.org on 2015/09/16 03:39:54 UTC

flume git commit: FLUME-2672. NPE in KafkaSourceCounter

Repository: flume
Updated Branches:
  refs/heads/trunk 318da2088 -> 67189ca84


FLUME-2672. NPE in KafkaSourceCounter

(Rigo MacTaggart via Johny Rufus)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/67189ca8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/67189ca8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/67189ca8

Branch: refs/heads/trunk
Commit: 67189ca84d24154150fa2ca4194b3b8d79400bda
Parents: 318da20
Author: Johny Rufus <jo...@apache.org>
Authored: Tue Sep 15 18:35:28 2015 -0700
Committer: Johny Rufus <jo...@apache.org>
Committed: Tue Sep 15 18:35:28 2015 -0700

----------------------------------------------------------------------
 .../kafka/KafkaSourceCounter.java               |  2 +-
 .../kafka/KafkaSourceCounterTest.java           | 63 ++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/67189ca8/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
index 1cb911d..ad0ba2c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
@@ -31,7 +31,7 @@ public class KafkaSourceCounter extends SourceCounter implements  KafkaSourceCou
       "source.kafka.empty.count";
 
   private static final String[] ATTRIBUTES =
-      {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
+      {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET, COUNTER_KAFKA_EMPTY};
 
   public KafkaSourceCounter(String name) {
     super(name, ATTRIBUTES);

http://git-wip-us.apache.org/repos/asf/flume/blob/67189ca8/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
new file mode 100644
index 0000000..4a71265
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaSourceCounterTest {
+
+    KafkaSourceCounter counter;
+
+    @Before
+    public void setUp() throws Exception {
+        counter = new KafkaSourceCounter("test");
+    }
+
+    @Test
+    public void testAddToKafkaEventGetTimer() throws Exception {
+        Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L));
+    }
+
+    @Test
+    public void testAddToKafkaCommitTimer() throws Exception {
+        Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L));
+    }
+
+    @Test
+    public void testIncrementKafkaEmptyCount() throws Exception {
+        Assert.assertEquals(1L, counter.incrementKafkaEmptyCount());
+    }
+
+    @Test
+    public void testGetKafkaCommitTimer() throws Exception {
+        Assert.assertEquals(0, counter.getKafkaCommitTimer());
+    }
+
+    @Test
+    public void testGetKafkaEventGetTimer() throws Exception {
+        Assert.assertEquals(0, counter.getKafkaEventGetTimer());
+    }
+
+    @Test
+    public void testGetKafkaEmptyCount() throws Exception {
+        Assert.assertEquals(0, counter.getKafkaEmptyCount());
+    }
+
+}
\ No newline at end of file