You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/06/07 23:56:34 UTC
git commit: FLUME-2072. JMX metrics support for HBase Sink
Updated Branches:
refs/heads/trunk e442c29a6 -> 791f443fa
FLUME-2072. JMX metrics support for HBase Sink
(Sravya Tirukkovalur via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/791f443f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/791f443f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/791f443f
Branch: refs/heads/trunk
Commit: 791f443fae173054cf29ac52fee8e9cf7fe70dc7
Parents: e442c29
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Jun 7 14:54:00 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Jun 7 14:55:27 2013 -0700
----------------------------------------------------------------------
.../org/apache/flume/sink/hbase/HBaseSink.java | 29 ++++++++++++---
1 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/791f443f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index a8ec87e..d5996c3 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -24,12 +24,12 @@ import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -88,7 +88,6 @@ public class HBaseSink extends AbstractSink implements Configurable {
private HTable table;
private long batchSize;
private Configuration config;
- private CounterGroup counterGroup = new CounterGroup();
private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
private HbaseEventSerializer serializer;
private String eventSerializerType;
@@ -97,6 +96,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
private String kerberosKeytab;
private User hbaseUser;
private boolean enableWal = true;
+ private SinkCounter sinkCounter;
public HBaseSink(){
this(HBaseConfiguration.create());
@@ -116,6 +116,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
kerberosPrincipal, kerberosKeytab);
}
} catch (Exception ex) {
+ sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
+ "provided credentials.", ex);
}
@@ -131,6 +132,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
});
} catch (Exception e) {
+ sinkCounter.incrementConnectionFailedCount();
logger.error("Could not load table, " + tableName +
" from HBase", e);
throw new FlumeException("Could not load table, " + tableName +
@@ -149,6 +151,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
} catch (Exception e) {
//Get getTableDescriptor also throws IOException, so catch the IOException
//thrown above or by the getTableDescriptor() call.
+ sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Error getting column family from HBase."
+ "Please verify that the table " + tableName + " and Column Family, "
+ Bytes.toString(columnFamily) + " exists in HBase, and the"
@@ -156,6 +159,8 @@ public class HBaseSink extends AbstractSink implements Configurable {
}
super.start();
+ sinkCounter.incrementConnectionCreatedCount();
+ sinkCounter.start();
}
@Override
@@ -166,6 +171,8 @@ public class HBaseSink extends AbstractSink implements Configurable {
} catch (IOException e) {
throw new FlumeException("Error closing table.", e);
}
+ sinkCounter.incrementConnectionClosedCount();
+ sinkCounter.stop();
}
@SuppressWarnings("unchecked")
@@ -214,6 +221,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
"writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
+ sinkCounter = new SinkCounter(this.getName());
}
@Override
@@ -224,11 +232,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();
txn.begin();
- for(long i = 0; i < batchSize; i++) {
+ long i = 0;
+ for(; i < batchSize; i++) {
Event event = channel.take();
if(event == null){
status = Status.BACKOFF;
- counterGroup.incrementAndGet("channel.underflow");
+ if (i == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
break;
} else {
serializer.initialize(event, columnFamily);
@@ -236,6 +249,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
incs.addAll(serializer.getIncrements());
}
}
+ if (i == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
+ }
+ sinkCounter.addToEventDrainAttemptCount(i);
+
putEventsAndCommit(actions, incs, txn);
return status;
}
@@ -272,7 +290,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
});
txn.commit();
- counterGroup.incrementAndGet("transaction.success");
+ sinkCounter.addToEventDrainSuccessCount(actions.size());
} catch (Throwable e) {
try{
txn.rollback();
@@ -280,7 +298,6 @@ public class HBaseSink extends AbstractSink implements Configurable {
logger.error("Exception in rollback. Rollback might not have been" +
"successful." , e2);
}
- counterGroup.incrementAndGet("transaction.rollback");
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
if(e instanceof Error || e instanceof RuntimeException){