You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2014/03/01 01:27:37 UTC
git commit: FLUME-2324: Support writing to multiple HBase clusters
using HBaseSink
Repository: flume
Updated Branches:
refs/heads/trunk 9688cad6f -> 96f6b6284
FLUME-2324: Support writing to multiple HBase clusters using HBaseSink
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/96f6b628
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/96f6b628
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/96f6b628
Branch: refs/heads/trunk
Commit: 96f6b6284c6e8a645b122111059ee954e9bad7b5
Parents: 9688cad
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Feb 28 16:25:57 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Feb 28 16:25:57 2014 -0800
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +
.../org/apache/flume/sink/hbase/HBaseSink.java | 48 +++++++++++
.../apache/flume/sink/hbase/TestHBaseSink.java | 83 +++++++++++++++++++-
3 files changed, 131 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2cd0996..96bf73e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1836,6 +1836,8 @@ Property Name Default Desc
**type** -- The component type name, needs to be ``hbase``
**table** -- The name of the table in Hbase to write to.
**columnFamily** -- The column family in Hbase to write to.
+zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
batchSize 100 Number of events to be written per txn.
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol".
serializer.* -- Properties to be passed to the serializer.
http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index f5cb229..c4a666c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
@@ -221,9 +222,56 @@ public class HBaseSink extends AbstractSink implements Configurable {
"writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
+ String zkQuorum = context.getString(HBaseSinkConfigurationConstants
+ .ZK_QUORUM);
+ Integer port = null;
+ /**
+ * HBase allows multiple nodes in the quorum, but all need to use the
+ * same client port. So get the nodes in host:port format,
+ * and ignore the ports for all nodes except the first one. If no port is
+ * specified, use default.
+ */
+ if (zkQuorum != null && !zkQuorum.isEmpty()) {
+ StringBuilder zkBuilder = new StringBuilder();
+ logger.info("Using ZK Quorum: " + zkQuorum);
+ String[] zkHosts = zkQuorum.split(",");
+ int length = zkHosts.length;
+ for(int i = 0; i < length; i++) {
+ String[] zkHostAndPort = zkHosts[i].split(":");
+ zkBuilder.append(zkHostAndPort[0].trim());
+ if(i != length-1) {
+ zkBuilder.append(",");
+ } else {
+ zkQuorum = zkBuilder.toString();
+ }
+ if (zkHostAndPort[1] == null) {
+ throw new FlumeException("Expected client port for the ZK node!");
+ }
+ if (port == null) {
+ port = Integer.parseInt(zkHostAndPort[1].trim());
+ } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
+ throw new FlumeException("All Zookeeper nodes in the quorum must " +
+ "use the same client port.");
+ }
+ }
+ if(port == null) {
+ port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+ }
+ this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+ this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
+ }
+ String hbaseZnode = context.getString(
+ HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
+ if(hbaseZnode != null && !hbaseZnode.isEmpty()) {
+ this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
+ }
sinkCounter = new SinkCounter(this.getName());
}
+ public Configuration getConfig() {
+ return config;
+ }
+
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index 068f543..cb7c6ea 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -77,6 +78,8 @@ public class TestHBaseSink {
testUtility.shutdownMiniCluster();
}
+
+
@Test
public void testOneEventWithDefaults() throws Exception {
//Create a context without setting increment column and payload Column
@@ -90,7 +93,7 @@ public class TestHBaseSink {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
- Configurables.configure(sink, tmpctx);
+ Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
sink.setChannel(channel);
@@ -440,6 +443,82 @@ public class TestHBaseSink {
testUtility.deleteTable(tableName.getBytes());
}
+ @Test
+ public void testWithoutConfigurationObject() throws Exception{
+ ctx.put("batchSize", "2");
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
+ ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
+ System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM));
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ HBaseSink sink = new HBaseSink();
+ Configurables.configure(sink, ctx);
+ // Reset context to values usable by other tests.
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
+ ctx.put("batchSize", "100");
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, ctx);
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for(int i = 0; i < 3; i++){
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+ channel.put(e);
+ }
+ tx.commit();
+ tx.close();
+ Status status = Status.READY;
+ while(status != Status.BACKOFF){
+ status = sink.process();
+ }
+ sink.stop();
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 3);
+ byte[] out;
+ int found = 0;
+ for(int i = 0; i < 3; i++){
+ for(int j = 0; j < 3; j++){
+ if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+ found++;
+ break;
+ }
+ }
+ }
+ Assert.assertEquals(3, found);
+ out = results[3];
+ Assert.assertArrayEquals(Longs.toByteArray(3), out);
+ }
-}
+ @Test
+ public void testZKQuorum() throws Exception{
+ String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " +
+ "zk3.flume.apache.org:3342";
+ ctx.put("batchSize", "2");
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ HBaseSink sink = new HBaseSink();
+ Configurables.configure(sink, ctx);
+ Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
+ "zk3.flume.apache.org", sink.getConfig().get(HConstants
+ .ZOOKEEPER_QUORUM));
+ Assert.assertEquals(String.valueOf(3342), sink.getConfig().get(HConstants
+ .ZOOKEEPER_CLIENT_PORT));
+ }
+ @Test (expected = FlumeException.class)
+ public void testZKQuorumIncorrectPorts() throws Exception{
+ String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " +
+ "zk3.flume.apache.org:3342";
+ ctx.put("batchSize", "2");
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ HBaseSink sink = new HBaseSink();
+ Configurables.configure(sink, ctx);
+ Assert.fail();
+ }
+}
\ No newline at end of file