You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/01/17 01:25:56 UTC
flume git commit: FLUME-2562. Add metrics for Kafka Source,
Kafka Sink and Kafka Channel.
Repository: flume
Updated Branches:
refs/heads/trunk 199684b62 -> 1d9bab676
FLUME-2562. Add metrics for Kafka Source, Kafka Sink and Kafka Channel.
(Gwen Shapira via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1d9bab67
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1d9bab67
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1d9bab67
Branch: refs/heads/trunk
Commit: 1d9bab6760df38e538705a74dd599de03129777b
Parents: 199684b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Jan 16 16:24:24 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Jan 16 16:24:24 2015 -0800
----------------------------------------------------------------------
.../flume/channel/kafka/KafkaChannel.java | 25 +++++-
.../flume/instrumentation/ChannelCounter.java | 7 ++
.../flume/instrumentation/SinkCounter.java | 8 ++
.../flume/instrumentation/SourceCounter.java | 7 ++
.../kafka/KafkaChannelCounter.java | 82 ++++++++++++++++++++
.../kafka/KafkaChannelCounterMBean.java | 50 ++++++++++++
.../instrumentation/kafka/KafkaSinkCounter.java | 53 +++++++++++++
.../kafka/KafkaSinkCounterMBean.java | 48 ++++++++++++
.../kafka/KafkaSourceCounter.java | 64 +++++++++++++++
.../kafka/KafkaSourceCounterMBean.java | 47 +++++++++++
.../org/apache/flume/sink/kafka/KafkaSink.java | 15 ++++
.../apache/flume/source/kafka/KafkaSource.java | 18 +++++
12 files changed, 423 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index d767aac..80a122d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -36,6 +36,7 @@ import org.apache.flume.conf.ConfigurationException;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,8 @@ public class KafkaChannel extends BasicChannelSemantics {
private final List<ConsumerAndIterator> consumers =
Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
+ private KafkaChannelCounter counter;
+
/* Each ConsumerConnector commit will commit all partitions owned by it. To
* ensure that each partition is only committed when all events are
* actually done, we will need to keep a ConsumerConnector per thread.
@@ -95,6 +98,7 @@ public class KafkaChannel extends BasicChannelSemantics {
// We always have just one topic being read by one thread
LOGGER.info("Topic = " + topic.get());
topicCountMap.put(topic.get(), 1);
+ counter.start();
super.start();
} catch (Exception e) {
LOGGER.error("Could not start producer");
@@ -114,7 +118,10 @@ public class KafkaChannel extends BasicChannelSemantics {
}
}
producer.close();
+ counter.stop();
super.stop();
+ LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(),
+ counter);
}
@Override
@@ -192,6 +199,10 @@ public class KafkaChannel extends BasicChannelSemantics {
kafkaConf.put("auto.offset.reset", "smallest");
}
+ if (counter == null) {
+ counter = new KafkaChannelCounter(getName());
+ }
+
}
private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
@@ -291,7 +302,10 @@ public class KafkaChannel extends BasicChannelSemantics {
} else {
try {
ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
+ long startTime = System.nanoTime();
it.hasNext();
+ long endTime = System.nanoTime();
+ counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
if (parseAsFlumeEvent) {
ByteArrayInputStream in =
new ByteArrayInputStream(it.next().message());
@@ -339,7 +353,11 @@ public class KafkaChannel extends BasicChannelSemantics {
messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
batchUUID, event));
}
+ long startTime = System.nanoTime();
producer.send(messages);
+ long endTime = System.nanoTime();
+ counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
+ counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
serializedEvents.get().clear();
} catch (Exception ex) {
LOGGER.warn("Sending events to Kafka failed", ex);
@@ -348,8 +366,12 @@ public class KafkaChannel extends BasicChannelSemantics {
}
} else {
if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+ long startTime = System.nanoTime();
consumerAndIter.get().consumer.commitOffsets();
- }
+ long endTime = System.nanoTime();
+ counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000));
+ }
+ counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
events.get().clear();
}
}
@@ -362,6 +384,7 @@ public class KafkaChannel extends BasicChannelSemantics {
if (type.equals(TransactionType.PUT)) {
serializedEvents.get().clear();
} else {
+ counter.addToRollbackCounter(Long.valueOf(events.get().size()));
consumerAndIter.get().failedEvents.addAll(events.get());
events.get().clear();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
index 9938c0a..977ad6c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
@@ -18,6 +18,8 @@
*/
package org.apache.flume.instrumentation;
+import org.apache.commons.lang.ArrayUtils;
+
public class ChannelCounter extends MonitoredCounterGroup implements
ChannelCounterMBean {
@@ -48,6 +50,11 @@ public class ChannelCounter extends MonitoredCounterGroup implements
super(MonitoredCounterGroup.Type.CHANNEL, name, ATTRIBUTES);
}
+ public ChannelCounter(String name, String[] attributes) {
+ super(MonitoredCounterGroup.Type.CHANNEL, name,
+ (String[])ArrayUtils.addAll(attributes,ATTRIBUTES));
+ }
+
@Override
public long getChannelSize() {
return get(COUNTER_CHANNEL_SIZE);
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
index 41b28cf..54f4a4c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
@@ -17,6 +17,8 @@
*/
package org.apache.flume.instrumentation;
+import org.apache.commons.lang.ArrayUtils;
+
public class SinkCounter extends MonitoredCounterGroup implements
SinkCounterMBean {
@@ -56,6 +58,12 @@ public class SinkCounter extends MonitoredCounterGroup implements
super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
}
+ public SinkCounter(String name, String[] attributes) {
+ super(MonitoredCounterGroup.Type.SINK, name,
+ (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES));
+ }
+
+
@Override
public long getConnectionCreatedCount() {
return get(COUNTER_CONNECTION_CREATED);
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index 972d2c6..02ef6ed 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -18,6 +18,8 @@
*/
package org.apache.flume.instrumentation;
+import org.apache.commons.lang.ArrayUtils;
+
public class SourceCounter extends MonitoredCounterGroup implements
SourceCounterMBean {
@@ -53,6 +55,11 @@ public class SourceCounter extends MonitoredCounterGroup implements
super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
}
+ public SourceCounter(String name, String[] attributes) {
+ super(Type.SOURCE, name,
+ (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES));
+ }
+
@Override
public long getEventReceivedCount() {
return get(COUNTER_EVENTS_RECEIVED);
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
new file mode 100644
index 0000000..6e142cf
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.instrumentation.ChannelCounter;
+
+public class KafkaChannelCounter extends ChannelCounter
+ implements KafkaChannelCounterMBean {
+
+
+ private static final String TIMER_KAFKA_EVENT_GET =
+ "channel.kafka.event.get.time";
+
+ private static final String TIMER_KAFKA_EVENT_SEND =
+ "channel.kafka.event.send.time";
+
+ private static final String TIMER_KAFKA_COMMIT =
+ "channel.kafka.commit.time";
+
+ private static final String COUNT_ROLLBACK =
+ "channel.rollback.count";
+
+
+ private static String[] ATTRIBUTES = {
+ TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET,
+ COUNT_ROLLBACK
+ };
+
+
+ public KafkaChannelCounter(String name) {
+ super(name,ATTRIBUTES);
+ }
+
+ public long addToKafkaEventGetTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_EVENT_GET, delta);
+ }
+
+ public long addToKafkaEventSendTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_EVENT_SEND, delta);
+ }
+
+ public long addToKafkaCommitTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_COMMIT, delta);
+ }
+
+ public long addToRollbackCounter(long delta) {
+ return addAndGet(COUNT_ROLLBACK,delta);
+ }
+
+ public long getKafkaEventGetTimer() {
+ return get(TIMER_KAFKA_EVENT_GET);
+ }
+
+ public long getKafkaEventSendTimer() {
+ return get(TIMER_KAFKA_EVENT_SEND);
+ }
+
+ public long getKafkaCommitTimer() {
+ return get(TIMER_KAFKA_COMMIT);
+ }
+
+ public long getRollbackCount() {
+ return get(COUNT_ROLLBACK);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
new file mode 100644
index 0000000..da64f0c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaChannelCounterMBean {
+
+ long getKafkaEventGetTimer();
+
+ long getKafkaEventSendTimer();
+
+ long getKafkaCommitTimer();
+
+ long getRollbackCount();
+
+ long getChannelSize();
+
+ long getEventPutAttemptCount();
+
+ long getEventTakeAttemptCount();
+
+ long getEventPutSuccessCount();
+
+ long getEventTakeSuccessCount();
+
+ long getStartTime();
+
+ long getStopTime();
+
+ long getChannelCapacity();
+
+ String getType();
+
+ double getChannelFillPercentage();
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
new file mode 100644
index 0000000..1308ff3
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.flume.instrumentation.SinkCounter;
+
+public class KafkaSinkCounter extends SinkCounter implements KafkaSinkCounterMBean {
+
+ private static final String TIMER_KAFKA_EVENT_SEND =
+ "channel.kafka.event.send.time";
+
+ private static final String COUNT_ROLLBACK =
+ "channel.rollback.count";
+
+ private static final String[] ATTRIBUTES =
+ {COUNT_ROLLBACK,TIMER_KAFKA_EVENT_SEND};
+
+ public KafkaSinkCounter(String name) {
+ super(name,ATTRIBUTES);
+ }
+
+ public long addToKafkaEventSendTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_EVENT_SEND,delta);
+ }
+
+ public long incrementRollbackCount() {
+ return increment(COUNT_ROLLBACK);
+ }
+
+ public long getKafkaEventSendTimer() {
+ return get(TIMER_KAFKA_EVENT_SEND);
+ }
+
+ public long getRollbackCount() {
+ return get(COUNT_ROLLBACK);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
new file mode 100644
index 0000000..f49ca26
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaSinkCounterMBean {
+
+ long getKafkaEventSendTimer();
+
+ long getRollbackCount();
+
+ long getConnectionCreatedCount();
+
+ long getConnectionClosedCount();
+
+ long getConnectionFailedCount();
+
+ long getBatchEmptyCount();
+
+ long getBatchUnderflowCount();
+
+ long getBatchCompleteCount();
+
+ long getEventDrainAttemptCount();
+
+ long getEventDrainSuccessCount();
+
+ long getStartTime();
+
+ long getStopTime();
+
+ String getType();
+}
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
new file mode 100644
index 0000000..1cb911d
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.flume.instrumentation.SourceCounter;
+
+public class KafkaSourceCounter extends SourceCounter implements KafkaSourceCounterMBean {
+
+ private static final String TIMER_KAFKA_EVENT_GET =
+ "source.kafka.event.get.time";
+
+ private static final String TIMER_KAFKA_COMMIT =
+ "source.kafka.commit.time";
+
+ private static final String COUNTER_KAFKA_EMPTY =
+ "source.kafka.empty.count";
+
+ private static final String[] ATTRIBUTES =
+ {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
+
+ public KafkaSourceCounter(String name) {
+ super(name, ATTRIBUTES);
+ }
+
+ public long addToKafkaEventGetTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_EVENT_GET,delta);
+ }
+
+ public long addToKafkaCommitTimer(long delta) {
+ return addAndGet(TIMER_KAFKA_COMMIT,delta);
+ }
+
+ public long incrementKafkaEmptyCount() {
+ return increment(COUNTER_KAFKA_EMPTY);
+ }
+
+ public long getKafkaCommitTimer() {
+ return get(TIMER_KAFKA_COMMIT);
+ }
+
+ public long getKafkaEventGetTimer() {
+ return get(TIMER_KAFKA_EVENT_GET);
+ }
+
+ public long getKafkaEmptyCount() {
+ return get(COUNTER_KAFKA_EMPTY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
new file mode 100644
index 0000000..219a5b6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaSourceCounterMBean {
+
+ long getKafkaEventGetTimer();
+
+ long getKafkaCommitTimer();
+
+ long getKafkaEmptyCount();
+
+ long getEventReceivedCount();
+
+ long getEventAcceptedCount();
+
+ long getAppendReceivedCount();
+
+ long getAppendAcceptedCount();
+
+ long getAppendBatchReceivedCount();
+
+ long getAppendBatchAcceptedCount();
+
+ long getStartTime();
+
+ long getStopTime();
+
+ String getType();
+
+ long getOpenConnectionCount();
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index a90b950..eada17c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
private String topic;
private int batchSize;
private List<KeyedMessage<String, byte[]>> messageList;
+ private KafkaSinkCounter counter;
+
@Override
public Status process() throws EventDeliveryException {
@@ -122,7 +125,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
// publish batch and commit.
if (processedEvents > 0) {
+ long startTime = System.nanoTime();
producer.send(messageList);
+ long endTime = System.nanoTime();
+ counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
+ counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}
transaction.commit();
@@ -134,6 +141,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (transaction != null) {
try {
transaction.rollback();
+ counter.incrementRollbackCount();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
@@ -154,12 +162,15 @@ public class KafkaSink extends AbstractSink implements Configurable {
// instantiate the producer
ProducerConfig config = new ProducerConfig(kafkaProps);
producer = new Producer<String, byte[]>(config);
+ counter.start();
super.start();
}
@Override
public synchronized void stop() {
producer.close();
+ counter.stop();
+ logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), counter);
super.stop();
}
@@ -202,5 +213,9 @@ public class KafkaSink extends AbstractSink implements Configurable {
if (logger.isDebugEnabled()) {
logger.debug("Kafka producer properties: " + kafkaProps);
}
+
+ if (counter == null) {
+ counter = new KafkaSinkCounter(getName());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 00a81c6..3777639 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -32,6 +32,8 @@ import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
@@ -77,6 +79,7 @@ public class KafkaSource extends AbstractSource
private Context context;
private Properties kafkaProps;
private final List<Event> eventList = new ArrayList<Event>();
+ private KafkaSourceCounter counter;
public Status process() throws EventDeliveryException {
@@ -88,6 +91,7 @@ public class KafkaSource extends AbstractSource
long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
try {
boolean iterStatus = false;
+ long startTime = System.nanoTime();
while (eventList.size() < batchUpperLimit &&
System.currentTimeMillis() < batchEndTime) {
iterStatus = hasNext();
@@ -116,22 +120,30 @@ public class KafkaSource extends AbstractSource
log.debug("Event #: {}", eventList.size());
}
}
+ long endTime = System.nanoTime();
+ counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
+ counter.addToEventReceivedCount(Long.valueOf(eventList.size()));
// If we have events, send events to channel
// clear the event list
// and commit if Kafka doesn't auto-commit
if (eventList.size() > 0) {
getChannelProcessor().processEventBatch(eventList);
+ counter.addToEventAcceptedCount(eventList.size());
eventList.clear();
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
if (!kafkaAutoCommitEnabled) {
// commit the read transactions to Kafka to avoid duplicates
+ long commitStartTime = System.nanoTime();
consumer.commitOffsets();
+ long commitEndTime = System.nanoTime();
+ counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000));
}
}
if (!iterStatus) {
if (log.isDebugEnabled()) {
+ counter.incrementKafkaEmptyCount();
log.debug("Returning with backoff. No more data to read");
}
return Status.BACKOFF;
@@ -174,6 +186,9 @@ public class KafkaSource extends AbstractSource
kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
KafkaSourceConstants.AUTO_COMMIT_ENABLED));
+ if (counter == null) {
+ counter = new KafkaSourceCounter(getName());
+ }
}
@Override
@@ -207,6 +222,7 @@ public class KafkaSource extends AbstractSource
throw new FlumeException("Unable to get message iterator from Kafka", e);
}
log.info("Kafka source {} started.", getName());
+ counter.start();
super.start();
}
@@ -217,6 +233,8 @@ public class KafkaSource extends AbstractSource
// to avoid reading the same messages again
consumer.shutdown();
}
+ counter.stop();
+ log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
super.stop();
}