You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/10 19:24:43 UTC

git commit: FLUME-1821: Support configuration of hbase instances to be used in AsyncHBaseSink from flume config

Updated Branches:
  refs/heads/trunk 1a2e0d7a7 -> 32fef9342


FLUME-1821: Support configuration of hbase instances to be used in AsyncHBaseSink from flume config

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/32fef934
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/32fef934
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/32fef934

Branch: refs/heads/trunk
Commit: 32fef9342caedcb243a32413473caec91becfa09
Parents: 1a2e0d7
Author: Brock Noland <br...@apache.org>
Authored: Thu Jan 10 10:24:10 2013 -0800
Committer: Brock Noland <br...@apache.org>
Committed: Thu Jan 10 10:24:10 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   11 +++-
 .../apache/flume/sink/hbase/AsyncHBaseSink.java    |   30 ++++++-
 .../hbase/HBaseSinkConfigurationConstants.java     |   10 +++
 .../flume/sink/hbase/TestAsyncHBaseSink.java       |   63 ++++++++++++++-
 4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 63b8f9b..58a115e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1526,7 +1526,6 @@ HBase puts and/or increments. These puts and increments are then written
 to HBase. This sink provides the same consistency guarantees as HBase,
 which is currently row-wise atomicity. In the event of Hbase failing to
 write certain events, the sink will replay all events in that transaction.
-This sink is still experimental.
 The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
 Required properties are in **bold**.
 
@@ -1536,6 +1535,8 @@ Property Name     Default
 **channel**       --
 **type**          --                                                            The component type name, needs to be ``org.apache.flume.sink.hbase.AsyncHBaseSink``
 **table**         --                                                            The name of the table in Hbase to write to.
+zookeeperQuorum   --                                                            The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent       /hbase                                                        The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
 **columnFamily**  --                                                            The column family in Hbase to write to.
 batchSize         100                                                           Number of events to be written per txn.
 timeout           --                                                            The length of time (in milliseconds) the sink waits for acks from hbase for
@@ -1544,6 +1545,14 @@ serializer        org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 serializer.*      --                                                            Properties to be passed to the serializer.
 ================  ============================================================  ====================================================================================
 
+Note that this sink takes the Zookeeper Quorum and parent znode information in
+the configuration. Zookeeper Quorum and parent node configuration may be
+specified in the flume configuration file, alternatively these configuration
+values are taken from the first hbase-site.xml file in the classpath.
+
+If these are not provided in the configuration, then the sink
+will read this information from the first hbase-site.xml file in the classpath.
+
 Example for agent named a1:
 
 .. code-block:: properties

http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 1598f26..6b34873 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -105,14 +106,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private volatile boolean open = false;
   private SinkCounter sinkCounter;
   private long timeout;
+  private String zkQuorum;
+  private String zkBaseDir;
 
   public AsyncHBaseSink(){
-    conf = HBaseConfiguration.create();
+    this(null);
   }
 
   public AsyncHBaseSink(Configuration conf) {
     this.conf = conf;
   }
+
   @Override
   public Status process() throws EventDeliveryException {
     /*
@@ -284,6 +288,27 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
               + "Sink will not timeout.");
       timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
     }
+
+    zkQuorum = context.getString(
+        HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
+    if(!zkQuorum.isEmpty()) {
+      zkBaseDir = context.getString(
+          HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+          HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
+    } else {
+      if (conf == null) { //In tests, we pass the conf in.
+        conf = HBaseConfiguration.create();
+      }
+      zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+    }
+    Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
+        "The Zookeeper quorum cannot be null and should be specified.");
+  }
+
+  @VisibleForTesting
+  boolean isConfNull() {
+    return conf == null;
   }
   @Override
   public void start(){
@@ -291,8 +316,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
             + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
-    String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
-    String zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
     if(zkBaseDir != null){
       client = new HBaseClient(zkQuorum, zkBaseDir);
     } else {
@@ -344,6 +367,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
     client = null;
+    conf = null;
     open = false;
     super.stop();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 463c9c3..fad026c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -17,6 +17,9 @@
  * under the License.
  */
 package org.apache.flume.sink.hbase;
+
+import org.apache.hadoop.hbase.HConstants;
+
 /**
  * Constants used for configuration of HBaseSink and AsyncHBaseSink
  *
@@ -52,4 +55,11 @@ public class HBaseSinkConfigurationConstants {
 
   public static final String CONFIG_PRINCIPAL = "kerberosPrincipal";
 
+  public static final String ZK_QUORUM = "zookeeperQuorum";
+
+  public static final String ZK_ZNODE_PARENT = "znodeParent";
+
+  public static final String DEFAULT_ZK_ZNODE_PARENT =
+      HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index c835172..1f61406 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -167,7 +167,7 @@ public class TestAsyncHBaseSink {
     channel.put(e);
     tx.commit();
     tx.close();
-
+    Assert.assertFalse(sink.isConfNull());
     sink.process();
     sink.stop();
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -196,6 +196,7 @@ public class TestAsyncHBaseSink {
     }
     tx.commit();
     tx.close();
+    Assert.assertFalse(sink.isConfNull());
     sink.process();
     sink.stop();
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -242,6 +243,64 @@ public class TestAsyncHBaseSink {
       count++;
       status = sink.process();
     }
+    Assert.assertFalse(sink.isConfNull());
+    sink.stop();
+    Assert.assertEquals(2, count);
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 3);
+    byte[] out;
+    int found = 0;
+    for(int i = 0; i < 3; i++){
+      for(int j = 0; j < 3; j++){
+        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+          found++;
+          break;
+        }
+      }
+    }
+    Assert.assertEquals(3, found);
+    out = results[3];
+    Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  @Test
+  public void testWithoutConfigurationObject() throws Exception{
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    ctx.put("batchSize", "2");
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
+        testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+        testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    AsyncHBaseSink sink = new AsyncHBaseSink();
+    Configurables.configure(sink, ctx);
+    // Reset context to values usable by other tests.
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for(int i = 0; i < 3; i++){
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    int count = 0;
+    Status status = Status.READY;
+    while(status != Status.BACKOFF){
+      count++;
+      status = sink.process();
+    }
+    /*
+     * Make sure that the configuration was picked up from the context itself
+     * and not from a configuration object which was created by the sink.
+     */
+    Assert.assertTrue(sink.isConfNull());
     sink.stop();
     Assert.assertEquals(2, count);
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -282,6 +341,7 @@ public class TestAsyncHBaseSink {
     tx.commit();
     tx.close();
     sink.process();
+    Assert.assertFalse(sink.isConfNull());
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;
@@ -330,6 +390,7 @@ public class TestAsyncHBaseSink {
     tx.commit();
     tx.close();
     sink.process();
+    Assert.assertFalse(sink.isConfNull());
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;