You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/06/07 23:56:34 UTC

git commit: FLUME-2072. JMX metrics support for HBase Sink

Updated Branches:
  refs/heads/trunk e442c29a6 -> 791f443fa


FLUME-2072. JMX metrics support for HBase Sink

(Sravya Tirukkovalur via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/791f443f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/791f443f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/791f443f

Branch: refs/heads/trunk
Commit: 791f443fae173054cf29ac52fee8e9cf7fe70dc7
Parents: e442c29
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Jun 7 14:54:00 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Jun 7 14:55:27 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/hbase/HBaseSink.java     |   29 ++++++++++++---
 1 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/791f443f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index a8ec87e..d5996c3 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -24,12 +24,12 @@ import java.util.List;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -88,7 +88,6 @@ public class HBaseSink extends AbstractSink implements Configurable {
   private HTable table;
   private long batchSize;
   private Configuration config;
-  private CounterGroup counterGroup = new CounterGroup();
   private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
   private HbaseEventSerializer serializer;
   private String eventSerializerType;
@@ -97,6 +96,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   private String kerberosKeytab;
   private User hbaseUser;
   private boolean enableWal = true;
+  private SinkCounter sinkCounter;
 
   public HBaseSink(){
     this(HBaseConfiguration.create());
@@ -116,6 +116,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
           kerberosPrincipal, kerberosKeytab);
       }
     } catch (Exception ex) {
+      sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException("Failed to login to HBase using "
         + "provided credentials.", ex);
     }
@@ -131,6 +132,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
         }
       });
     } catch (Exception e) {
+      sinkCounter.incrementConnectionFailedCount();
       logger.error("Could not load table, " + tableName +
           " from HBase", e);
       throw new FlumeException("Could not load table, " + tableName +
@@ -149,6 +151,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
     } catch (Exception e) {
       //Get getTableDescriptor also throws IOException, so catch the IOException
       //thrown above or by the getTableDescriptor() call.
+      sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException("Error getting column family from HBase."
               + "Please verify that the table " + tableName + " and Column Family, "
               + Bytes.toString(columnFamily) + " exists in HBase, and the"
@@ -156,6 +159,8 @@ public class HBaseSink extends AbstractSink implements Configurable {
     }
 
     super.start();
+    sinkCounter.incrementConnectionCreatedCount();
+    sinkCounter.start();
   }
 
   @Override
@@ -166,6 +171,8 @@ public class HBaseSink extends AbstractSink implements Configurable {
     } catch (IOException e) {
       throw new FlumeException("Error closing table.", e);
     }
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
   }
 
   @SuppressWarnings("unchecked")
@@ -214,6 +221,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
         "writes to HBase will have WAL disabled, and any data in the " +
         "memstore of this region in the Region Server could be lost!");
     }
+    sinkCounter = new SinkCounter(this.getName());
   }
 
   @Override
@@ -224,11 +232,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
     List<Row> actions = new LinkedList<Row>();
     List<Increment> incs = new LinkedList<Increment>();
     txn.begin();
-    for(long i = 0; i < batchSize; i++) {
+    long i = 0;
+    for(; i < batchSize; i++) {
       Event event = channel.take();
       if(event == null){
         status = Status.BACKOFF;
-        counterGroup.incrementAndGet("channel.underflow");
+        if (i == 0) {
+          sinkCounter.incrementBatchEmptyCount();
+        } else {
+          sinkCounter.incrementBatchUnderflowCount();
+        }
         break;
       } else {
         serializer.initialize(event, columnFamily);
@@ -236,6 +249,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
         incs.addAll(serializer.getIncrements());
       }
     }
+    if (i == batchSize) {
+      sinkCounter.incrementBatchCompleteCount();
+    }
+    sinkCounter.addToEventDrainAttemptCount(i);
+
     putEventsAndCommit(actions, incs, txn);
     return status;
   }
@@ -272,7 +290,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
       });
 
       txn.commit();
-      counterGroup.incrementAndGet("transaction.success");
+      sinkCounter.addToEventDrainSuccessCount(actions.size());
     } catch (Throwable e) {
       try{
         txn.rollback();
@@ -280,7 +298,6 @@ public class HBaseSink extends AbstractSink implements Configurable {
         logger.error("Exception in rollback. Rollback might not have been" +
             "successful." , e2);
       }
-      counterGroup.incrementAndGet("transaction.rollback");
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
       if(e instanceof Error || e instanceof RuntimeException){