You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/10/04 02:27:51 UTC

git commit: FLUME-2202. AsyncHBaseSink should coalesce increments to reduce RPC roundtrips

Updated Branches:
  refs/heads/trunk b84d01615 -> c4e2129fd


FLUME-2202. AsyncHBaseSink should coalesce increments to reduce RPC roundtrips

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c4e2129f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c4e2129f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c4e2129f

Branch: refs/heads/trunk
Commit: c4e2129fd12f97303a1b8120a2ecf7da456e1b77
Parents: b84d016
Author: Mike Percy <mp...@cloudera.com>
Authored: Thu Oct 3 17:25:57 2013 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Thu Oct 3 17:25:57 2013 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  38 +++---
 .../apache/flume/sink/hbase/AsyncHBaseSink.java | 121 +++++++++++++++++--
 .../hbase/HBaseSinkConfigurationConstants.java  |   4 +
 .../hbase/IncrementAsyncHBaseSerializer.java    |  78 ++++++++++++
 .../flume/sink/hbase/TestAsyncHBaseSink.java    |  82 ++++++++++++-
 5 files changed, 291 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5a59b56..dc8d05d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1850,30 +1850,32 @@ AsyncHBaseSink
 ''''''''''''''
 
 This sink writes data to HBase using an asynchronous model. A class implementing
-AsyncHbaseEventSerializer
-which is specified by the configuration is used to convert the events into
+AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into
 HBase puts and/or increments. These puts and increments are then written
-to HBase. This sink provides the same consistency guarantees as HBase,
+to HBase. This sink uses the `Asynchbase API <https://github.com/OpenTSDB/asynchbase>`_ to write to
+HBase. This sink provides the same consistency guarantees as HBase,
 which is currently row-wise atomicity. In the event of Hbase failing to
 write certain events, the sink will replay all events in that transaction.
 The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
 Required properties are in **bold**.
 
-================  ============================================================  ====================================================================================
-Property Name     Default                                                       Description
-================  ============================================================  ====================================================================================
-**channel**       --
-**type**          --                                                            The component type name, needs to be ``asynchbase``
-**table**         --                                                            The name of the table in Hbase to write to.
-zookeeperQuorum   --                                                            The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
-znodeParent       /hbase                                                        The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
-**columnFamily**  --                                                            The column family in Hbase to write to.
-batchSize         100                                                           Number of events to be written per txn.
-timeout           60000                                                         The length of time (in milliseconds) the sink waits for acks from hbase for
-                                                                                all events in a transaction.
-serializer        org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
-serializer.*      --                                                            Properties to be passed to the serializer.
-================  ============================================================  ====================================================================================
+===================  ============================================================  ====================================================================================
+Property Name        Default                                                       Description
+===================  ============================================================  ====================================================================================
+**channel**          --
+**type**             --                                                            The component type name, needs to be ``asynchbase``
+**table**            --                                                            The name of the table in Hbase to write to.
+zookeeperQuorum      --                                                            The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent          /hbase                                                        The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
+**columnFamily**     --                                                            The column family in Hbase to write to.
+batchSize            100                                                           Number of events to be written per txn.
+coalesceIncrements   false                                                         Should the sink coalesce multiple increments to a cell per batch. This might give
+                                                                                   better performance if there are multiple increments to a limited number of cells.
+timeout              60000                                                         The length of time (in milliseconds) the sink waits for acks from hbase for
+                                                                                   all events in a transaction.
+serializer           org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
+serializer.*         --                                                            Properties to be passed to the serializer.
+===================  ============================================================  ====================================================================================
 
 Note that this sink takes the Zookeeper Quorum and parent znode information in
 the configuration. Zookeeper Quorum and parent node configuration may be

http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 5e297b1..0545554 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -18,13 +18,19 @@
  */
 package org.apache.flume.sink.hbase;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.UnsignedBytes;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -113,20 +119,32 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private String zkQuorum;
   private String zkBaseDir;
   private ExecutorService sinkCallbackPool;
-  private boolean isTest;
+  private boolean isTimeoutTest;
+  private boolean isCoalesceTest;
   private boolean enableWal = true;
+  private boolean batchIncrements = false;
+  private volatile int totalCallbacksReceived = 0;
+  private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
+
+  // Does not need to be thread-safe. Always called only from the sink's
+  // process method.
+  private final Comparator<byte[]> COMPARATOR = UnsignedBytes
+    .lexicographicalComparator();
 
   public AsyncHBaseSink(){
     this(null);
   }
 
   public AsyncHBaseSink(Configuration conf) {
-    this(conf, false);
+    this(conf, false, false);
   }
 
-  AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) {
+  @VisibleForTesting
+  AsyncHBaseSink(Configuration conf, boolean isTimeoutTest,
+    boolean isCoalesceTest) {
     this.conf = conf;
-    isTest = isTimeoutTesting;
+    this.isTimeoutTest = isTimeoutTest;
+    this.isCoalesceTest = isCoalesceTest;
   }
 
   @Override
@@ -138,7 +156,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
      * the next one is being processed.
      *
      */
-    if(!open){
+    if (!open) {
       throw new EventDeliveryException("Sink was never opened. " +
           "Please fix the configuration.");
     }
@@ -147,6 +165,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     AtomicInteger callbacksExpected = new AtomicInteger(0);
     final Lock lock = new ReentrantLock();
     final Condition condition = lock.newCondition();
+    if (incrementBuffer != null) {
+      incrementBuffer.clear();
+    }
     /*
      * Callbacks can be reused per transaction, since they share the same
      * locks and conditions.
@@ -185,18 +206,41 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
           serializer.setEvent(event);
           List<PutRequest> actions = serializer.getActions();
           List<AtomicIncrementRequest> increments = serializer.getIncrements();
-          callbacksExpected.addAndGet(actions.size() + increments.size());
+          callbacksExpected.addAndGet(actions.size());
+          if (!batchIncrements) {
+            callbacksExpected.addAndGet(increments.size());
+          }
 
           for (PutRequest action : actions) {
             action.setDurable(enableWal);
             client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
           }
           for (AtomicIncrementRequest increment : increments) {
-            client.atomicIncrement(increment).addCallbacks(
-                    incrementSuccessCallback, incrementFailureCallback);
+            if (batchIncrements) {
+              CellIdentifier identifier = new CellIdentifier(increment.key(),
+                increment.qualifier());
+              AtomicIncrementRequest request
+                = incrementBuffer.get(identifier);
+              if (request == null) {
+                incrementBuffer.put(identifier, increment);
+              } else {
+                request.setAmount(request.getAmount() + increment.getAmount());
+              }
+            } else {
+              client.atomicIncrement(increment).addCallbacks(
+                incrementSuccessCallback, incrementFailureCallback);
+            }
           }
         }
       }
+      if (batchIncrements) {
+        Collection<AtomicIncrementRequest> increments = incrementBuffer.values();
+        for (AtomicIncrementRequest increment : increments) {
+          client.atomicIncrement(increment).addCallbacks(
+            incrementSuccessCallback, incrementFailureCallback);
+        }
+        callbacksExpected.addAndGet(increments.size());
+      }
       client.flush();
     } catch (Throwable e) {
       this.handleTransactionFailure(txn);
@@ -216,7 +260,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
         timeRemaining = timeout - (System.nanoTime() - startTime);
         timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0;
         try {
-          if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){
+          if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) {
             txnFail.set(true);
             logger.warn("HBase callbacks timed out. "
                     + "Transaction will be rolled back.");
@@ -231,6 +275,10 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       lock.unlock();
     }
 
+    if (isCoalesceTest) {
+      totalCallbacksReceived += callbacksReceived.get();
+    }
+
     /*
      * At this point, either the txn has failed
      * or all callbacks received and txn is successful.
@@ -246,7 +294,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       throw new EventDeliveryException("Could not write events to Hbase. " +
           "Transaction failed, and rolled back.");
     } else {
-      try{
+      try {
         txn.commit();
         txn.close();
         sinkCounter.addToEventDrainSuccessCount(i);
@@ -334,6 +382,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
         "All writes to HBase will have WAL disabled, and any data in the " +
         "memstore of this region in the Region Server could be lost!");
     }
+
+    batchIncrements = context.getBoolean(
+      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+
+    if(batchIncrements) {
+      incrementBuffer = Maps.newHashMap();
+      logger.info("Increment coalescing is enabled. Increments will be " +
+        "buffered.");
+    }
+  }
+
+  @VisibleForTesting
+  int getTotalCallbacksReceived() {
+    return totalCallbacksReceived;
   }
 
   @VisibleForTesting
@@ -346,7 +409,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
             + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
-    if (!isTest) {
+    if (!isTimeoutTest) {
       sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setNameFormat(this.getName() + " HBase Call Pool").build());
     } else {
@@ -447,7 +510,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       lock = lck;
       this.callbacksReceived = callbacksReceived;
       this.condition = condition;
-      isTimeoutTesting = isTest;
+      isTimeoutTesting = isTimeoutTest;
     }
 
     @Override
@@ -487,7 +550,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       this.callbacksReceived = callbacksReceived;
       this.txnFail = txnFail;
       this.condition = condition;
-      isTimeoutTesting = isTest;
+      isTimeoutTesting = isTimeoutTest;
     }
 
     @Override
@@ -525,4 +588,36 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     }
     throw new EventDeliveryException("Error in processing transaction.", e);
   }
+
+  private class CellIdentifier {
+    private final byte[] row;
+    private final byte[] column;
+    private final int hashCode;
+    // Since the sink operates only on one table and one cf,
+    // we use the data from the owning sink
+    public CellIdentifier(byte[] row, byte[] column) {
+      this.row = row;
+      this.column = column;
+      this.hashCode =
+        (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31);
+    }
+
+    @Override
+    public int hashCode() {
+      return hashCode;
+    }
+
+    // Since we know that this class is used from only this class,
+    // skip the class comparison to save time
+    @Override
+    public boolean equals(Object other) {
+      CellIdentifier o = (CellIdentifier) other;
+      if (other == null) {
+        return false;
+      } else {
+        return (COMPARATOR.compare(row, o.row) == 0
+          && COMPARATOR.compare(column, o.column) == 0);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 7fdc75b..1a78071 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -66,4 +66,8 @@ public class HBaseSinkConfigurationConstants {
   public static final String DEFAULT_ZK_ZNODE_PARENT =
       HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
 
+  public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements";
+
+  public static final Boolean DEFAULT_COALESCE_INCREMENTS = false;
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
new file mode 100644
index 0000000..b8aefe8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.PutRequest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An AsyncHBaseEventSerializer implementation that increments a configured
+ * column for the row whose row key is the event's body bytes.
+ */
+public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer {
+  private byte[] table;
+  private byte[] cf;
+  private byte[] column;
+  private Event currentEvent;
+  @Override
+  public void initialize(byte[] table, byte[] cf) {
+    this.table = table;
+    this.cf = cf;
+  }
+
+  @Override
+  public void setEvent(Event event) {
+    this.currentEvent = event;
+  }
+
+  @Override
+  public List<PutRequest> getActions() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<AtomicIncrementRequest> getIncrements() {
+    List<AtomicIncrementRequest> incrs
+      = new ArrayList<AtomicIncrementRequest>();
+    AtomicIncrementRequest incr = new AtomicIncrementRequest(table,
+      currentEvent.getBody(), cf, column, 1);
+    incrs.add(incr);
+    return incrs;
+  }
+
+  @Override
+  public void cleanUp() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    column = context.getString("column", "col").getBytes();
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index a0c04eb..ccbc086 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -204,7 +204,7 @@ public class TestAsyncHBaseSink {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
-      true);
+      true, false);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, ctx);
@@ -271,6 +271,86 @@ public class TestAsyncHBaseSink {
   }
 
   @Test
+  public void testMultipleBatchesBatchIncrementsWithCoalescing()
+    throws Exception {
+    doTestMultipleBatchesBatchIncrements(true);
+  }
+
+  @Test
+  public void testMultipleBatchesBatchIncrementsNoCoalescing()
+    throws Exception {
+    doTestMultipleBatchesBatchIncrements(false);
+  }
+
+  public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws
+    Exception {
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
+      false, true);
+    if (coalesce) {
+      ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        "true");
+    }
+    ctx.put("batchSize", "2");
+    ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName());
+    ctx.put("serializer.column", "test");
+    Configurables.configure(sink, ctx);
+    //Reset the context to a higher batchSize
+    ctx.put("batchSize", "100");
+    // Restore the original serializer
+    ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName());
+    //Restore the no coalescing behavior
+    ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+      "false");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, ctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < 3; j++) {
+        Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+        channel.put(e);
+      }
+    }
+    tx.commit();
+    tx.close();
+    int count = 0;
+    Status status = Status.READY;
+    while (status != Status.BACKOFF) {
+      count++;
+      status = sink.process();
+    }
+    Assert.assertFalse(sink.isConfNull());
+    sink.stop();
+    Assert.assertEquals(7, count);
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    Scan scan = new Scan();
+    scan.addColumn(columnFamily.getBytes(),"test".getBytes());
+    scan.setStartRow(Bytes.toBytes(valBase));
+    ResultScanner rs = table.getScanner(scan);
+    int i = 0;
+    try {
+      for (Result r = rs.next(); r != null; r = rs.next()) {
+        byte[] out = r.getValue(columnFamily.getBytes(), "test".getBytes());
+        Assert.assertArrayEquals(Longs.toByteArray(3), out);
+        Assert.assertTrue(new String(r.getRow()).startsWith(valBase));
+        i++;
+      }
+    } finally {
+      rs.close();
+    }
+    Assert.assertEquals(4, i);
+    if (coalesce) {
+      Assert.assertEquals(8, sink.getTotalCallbacksReceived());
+    } else {
+      Assert.assertEquals(12, sink.getTotalCallbacksReceived());
+    }
+  }
+
+  @Test
   public void testWithoutConfigurationObject() throws Exception{
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;