You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jo...@apache.org on 2015/09/16 03:39:54 UTC
flume git commit: FLUME-2672. NPE in KafkaSourceCounter
Repository: flume
Updated Branches:
refs/heads/trunk 318da2088 -> 67189ca84
FLUME-2672. NPE in KafkaSourceCounter
(Rigo MacTaggart via Johny Rufus)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/67189ca8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/67189ca8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/67189ca8
Branch: refs/heads/trunk
Commit: 67189ca84d24154150fa2ca4194b3b8d79400bda
Parents: 318da20
Author: Johny Rufus <jo...@apache.org>
Authored: Tue Sep 15 18:35:28 2015 -0700
Committer: Johny Rufus <jo...@apache.org>
Committed: Tue Sep 15 18:35:28 2015 -0700
----------------------------------------------------------------------
.../kafka/KafkaSourceCounter.java | 2 +-
.../kafka/KafkaSourceCounterTest.java | 63 ++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/67189ca8/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
index 1cb911d..ad0ba2c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
@@ -31,7 +31,7 @@ public class KafkaSourceCounter extends SourceCounter implements KafkaSourceCou
"source.kafka.empty.count";
private static final String[] ATTRIBUTES =
- {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
+ {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET, COUNTER_KAFKA_EMPTY};
public KafkaSourceCounter(String name) {
super(name, ATTRIBUTES);
http://git-wip-us.apache.org/repos/asf/flume/blob/67189ca8/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
new file mode 100644
index 0000000..4a71265
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaSourceCounterTest {
+
+ KafkaSourceCounter counter;
+
+ @Before
+ public void setUp() throws Exception {
+ counter = new KafkaSourceCounter("test");
+ }
+
+ @Test
+ public void testAddToKafkaEventGetTimer() throws Exception {
+ Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L));
+ }
+
+ @Test
+ public void testAddToKafkaCommitTimer() throws Exception {
+ Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L));
+ }
+
+ @Test
+ public void testIncrementKafkaEmptyCount() throws Exception {
+ Assert.assertEquals(1L, counter.incrementKafkaEmptyCount());
+ }
+
+ @Test
+ public void testGetKafkaCommitTimer() throws Exception {
+ Assert.assertEquals(0, counter.getKafkaCommitTimer());
+ }
+
+ @Test
+ public void testGetKafkaEventGetTimer() throws Exception {
+ Assert.assertEquals(0, counter.getKafkaEventGetTimer());
+ }
+
+ @Test
+ public void testGetKafkaEmptyCount() throws Exception {
+ Assert.assertEquals(0, counter.getKafkaEmptyCount());
+ }
+
+}
\ No newline at end of file