You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/10 19:24:43 UTC
git commit: FLUME-1821: Support configuration of hbase instances to
be used in AsyncHBaseSink from flume config
Updated Branches:
refs/heads/trunk 1a2e0d7a7 -> 32fef9342
FLUME-1821: Support configuration of hbase instances to be used in AsyncHBaseSink from flume config
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/32fef934
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/32fef934
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/32fef934
Branch: refs/heads/trunk
Commit: 32fef9342caedcb243a32413473caec91becfa09
Parents: 1a2e0d7
Author: Brock Noland <br...@apache.org>
Authored: Thu Jan 10 10:24:10 2013 -0800
Committer: Brock Noland <br...@apache.org>
Committed: Thu Jan 10 10:24:10 2013 -0800
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 11 +++-
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 30 ++++++-
.../hbase/HBaseSinkConfigurationConstants.java | 10 +++
.../flume/sink/hbase/TestAsyncHBaseSink.java | 63 ++++++++++++++-
4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 63b8f9b..58a115e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1526,7 +1526,6 @@ HBase puts and/or increments. These puts and increments are then written
to HBase. This sink provides the same consistency guarantees as HBase,
which is currently row-wise atomicity. In the event of Hbase failing to
write certain events, the sink will replay all events in that transaction.
-This sink is still experimental.
The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
Required properties are in **bold**.
@@ -1536,6 +1535,8 @@ Property Name Default
**channel** --
**type** -- The component type name, needs to be ``org.apache.flume.sink.hbase.AsyncHBaseSink``
**table** -- The name of the table in Hbase to write to.
+zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
**columnFamily** -- The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
timeout -- The length of time (in milliseconds) the sink waits for acks from hbase for
@@ -1544,6 +1545,14 @@ serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.* -- Properties to be passed to the serializer.
================ ============================================================ ====================================================================================
+Note that this sink takes the Zookeeper Quorum and parent znode information in
+the configuration. Zookeeper Quorum and parent node configuration may be
+specified in the flume configuration file, alternatively these configuration
+values are taken from the first hbase-site.xml file in the classpath.
+
+If these are not provided in the configuration, then the sink
+will read this information from the first hbase-site.xml file in the classpath.
+
Example for agent named a1:
.. code-block:: properties
http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 1598f26..6b34873 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -105,14 +106,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private volatile boolean open = false;
private SinkCounter sinkCounter;
private long timeout;
+ private String zkQuorum;
+ private String zkBaseDir;
public AsyncHBaseSink(){
- conf = HBaseConfiguration.create();
+ this(null);
}
public AsyncHBaseSink(Configuration conf) {
this.conf = conf;
}
+
@Override
public Status process() throws EventDeliveryException {
/*
@@ -284,6 +288,27 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "Sink will not timeout.");
timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
}
+
+ zkQuorum = context.getString(
+ HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
+ if(!zkQuorum.isEmpty()) {
+ zkBaseDir = context.getString(
+ HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+ HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
+ } else {
+ if (conf == null) { //In tests, we pass the conf in.
+ conf = HBaseConfiguration.create();
+ }
+ zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ }
+ Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
+ "The Zookeeper quorum cannot be null and should be specified.");
+ }
+
+ @VisibleForTesting
+ boolean isConfNull() {
+ return conf == null;
}
@Override
public void start(){
@@ -291,8 +316,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "before calling start on an old instance.");
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
- String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
- String zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
if(zkBaseDir != null){
client = new HBaseClient(zkQuorum, zkBaseDir);
} else {
@@ -344,6 +367,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
client = null;
+ conf = null;
open = false;
super.stop();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 463c9c3..fad026c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -17,6 +17,9 @@
* under the License.
*/
package org.apache.flume.sink.hbase;
+
+import org.apache.hadoop.hbase.HConstants;
+
/**
* Constants used for configuration of HBaseSink and AsyncHBaseSink
*
@@ -52,4 +55,11 @@ public class HBaseSinkConfigurationConstants {
public static final String CONFIG_PRINCIPAL = "kerberosPrincipal";
+ public static final String ZK_QUORUM = "zookeeperQuorum";
+
+ public static final String ZK_ZNODE_PARENT = "znodeParent";
+
+ public static final String DEFAULT_ZK_ZNODE_PARENT =
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/32fef934/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index c835172..1f61406 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -167,7 +167,7 @@ public class TestAsyncHBaseSink {
channel.put(e);
tx.commit();
tx.close();
-
+ Assert.assertFalse(sink.isConfNull());
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -196,6 +196,7 @@ public class TestAsyncHBaseSink {
}
tx.commit();
tx.close();
+ Assert.assertFalse(sink.isConfNull());
sink.process();
sink.stop();
HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -242,6 +243,64 @@ public class TestAsyncHBaseSink {
count++;
status = sink.process();
}
+ Assert.assertFalse(sink.isConfNull());
+ sink.stop();
+ Assert.assertEquals(2, count);
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 3);
+ byte[] out;
+ int found = 0;
+ for(int i = 0; i < 3; i++){
+ for(int j = 0; j < 3; j++){
+ if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+ found++;
+ break;
+ }
+ }
+ }
+ Assert.assertEquals(3, found);
+ out = results[3];
+ Assert.assertArrayEquals(Longs.toByteArray(3), out);
+ testUtility.deleteTable(tableName.getBytes());
+ }
+
+ @Test
+ public void testWithoutConfigurationObject() throws Exception{
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ ctx.put("batchSize", "2");
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ AsyncHBaseSink sink = new AsyncHBaseSink();
+ Configurables.configure(sink, ctx);
+ // Reset context to values usable by other tests.
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
+ ctx.put("batchSize", "100");
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for(int i = 0; i < 3; i++){
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+ channel.put(e);
+ }
+ tx.commit();
+ tx.close();
+ int count = 0;
+ Status status = Status.READY;
+ while(status != Status.BACKOFF){
+ count++;
+ status = sink.process();
+ }
+ /*
+ * Make sure that the configuration was picked up from the context itself
+ * and not from a configuration object which was created by the sink.
+ */
+ Assert.assertTrue(sink.isConfNull());
sink.stop();
Assert.assertEquals(2, count);
HTable table = new HTable(testUtility.getConfiguration(), tableName);
@@ -282,6 +341,7 @@ public class TestAsyncHBaseSink {
tx.commit();
tx.close();
sink.process();
+ Assert.assertFalse(sink.isConfNull());
HTable table = new HTable(testUtility.getConfiguration(), tableName);
byte[][] results = getResults(table, 2);
byte[] out;
@@ -330,6 +390,7 @@ public class TestAsyncHBaseSink {
tx.commit();
tx.close();
sink.process();
+ Assert.assertFalse(sink.isConfNull());
HTable table = new HTable(testUtility.getConfiguration(), tableName);
byte[][] results = getResults(table, 2);
byte[] out;