You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2014/03/01 01:27:37 UTC

git commit: FLUME-2324: Support writing to multiple HBase clusters using HBaseSink

Repository: flume
Updated Branches:
  refs/heads/trunk 9688cad6f -> 96f6b6284


FLUME-2324: Support writing to multiple HBase clusters using HBaseSink

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/96f6b628
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/96f6b628
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/96f6b628

Branch: refs/heads/trunk
Commit: 96f6b6284c6e8a645b122111059ee954e9bad7b5
Parents: 9688cad
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Feb 28 16:25:57 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Feb 28 16:25:57 2014 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  2 +
 .../org/apache/flume/sink/hbase/HBaseSink.java  | 48 +++++++++++
 .../apache/flume/sink/hbase/TestHBaseSink.java  | 83 +++++++++++++++++++-
 3 files changed, 131 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2cd0996..96bf73e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1836,6 +1836,8 @@ Property Name       Default                                                 Desc
 **type**            --                                                      The component type name, needs to be ``hbase``
 **table**           --                                                      The name of the table in Hbase to write to.
 **columnFamily**    --                                                      The column family in Hbase to write to.
+zookeeperQuorum     --                                                      The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent         /hbase                                                  The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
 batchSize           100                                                     Number of events to be written per txn.
 serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer  Default increment column = "iCol", payload column = "pCol".
 serializer.*        --                                                      Properties to be passed to the serializer.

http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index f5cb229..c4a666c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
@@ -221,9 +222,56 @@ public class HBaseSink extends AbstractSink implements Configurable {
         "writes to HBase will have WAL disabled, and any data in the " +
         "memstore of this region in the Region Server could be lost!");
     }
+    String zkQuorum = context.getString(HBaseSinkConfigurationConstants
+      .ZK_QUORUM);
+    Integer port = null;
+    /**
+     * HBase allows multiple nodes in the quorum, but all need to use the
+     * same client port. So get the nodes in host:port format,
+     * and ignore the ports for all nodes except the first one. If no port is
+     * specified, use default.
+     */
+    if (zkQuorum != null && !zkQuorum.isEmpty()) {
+      StringBuilder zkBuilder = new StringBuilder();
+      logger.info("Using ZK Quorum: " + zkQuorum);
+      String[] zkHosts = zkQuorum.split(",");
+      int length = zkHosts.length;
+      for(int i = 0; i < length; i++) {
+        String[] zkHostAndPort = zkHosts[i].split(":");
+        zkBuilder.append(zkHostAndPort[0].trim());
+        if(i != length-1) {
+          zkBuilder.append(",");
+        } else {
+          zkQuorum = zkBuilder.toString();
+        }
+        if (zkHostAndPort[1] == null) {
+          throw new FlumeException("Expected client port for the ZK node!");
+        }
+        if (port == null) {
+          port = Integer.parseInt(zkHostAndPort[1].trim());
+        } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
+          throw new FlumeException("All Zookeeper nodes in the quorum must " +
+            "use the same client port.");
+        }
+      }
+      if(port == null) {
+        port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+      }
+      this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+      this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
+    }
+    String hbaseZnode = context.getString(
+      HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
+    if(hbaseZnode != null && !hbaseZnode.isEmpty()) {
+      this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
+    }
     sinkCounter = new SinkCounter(this.getName());
   }
 
+  public Configuration getConfig() {
+    return config;
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
     Status status = Status.READY;

http://git-wip-us.apache.org/repos/asf/flume/blob/96f6b628/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index 068f543..cb7c6ea 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -77,6 +78,8 @@ public class TestHBaseSink {
     testUtility.shutdownMiniCluster();
   }
 
+
+
   @Test
   public void testOneEventWithDefaults() throws Exception {
     //Create a context without setting increment column and payload Column
@@ -90,7 +93,7 @@ public class TestHBaseSink {
 
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
-    Configurables.configure(sink, tmpctx);
+    Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
     sink.setChannel(channel);
@@ -440,6 +443,82 @@ public class TestHBaseSink {
     testUtility.deleteTable(tableName.getBytes());
   }
 
+  @Test
+  public void testWithoutConfigurationObject() throws Exception{
+    ctx.put("batchSize", "2");
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
+      ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
+    System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM));
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    HBaseSink sink = new HBaseSink();
+    Configurables.configure(sink, ctx);
+    // Reset context to values usable by other tests.
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, ctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for(int i = 0; i < 3; i++){
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    Status status = Status.READY;
+    while(status != Status.BACKOFF){
+      status = sink.process();
+    }
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 3);
+    byte[] out;
+    int found = 0;
+    for(int i = 0; i < 3; i++){
+      for(int j = 0; j < 3; j++){
+        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+          found++;
+          break;
+        }
+      }
+    }
+    Assert.assertEquals(3, found);
+    out = results[3];
+    Assert.assertArrayEquals(Longs.toByteArray(3), out);
+  }
 
-}
+  @Test
+  public void testZKQuorum() throws Exception{
+    String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " +
+      "zk3.flume.apache.org:3342";
+    ctx.put("batchSize", "2");
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    HBaseSink sink = new HBaseSink();
+    Configurables.configure(sink, ctx);
+    Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
+      "zk3.flume.apache.org", sink.getConfig().get(HConstants
+      .ZOOKEEPER_QUORUM));
+    Assert.assertEquals(String.valueOf(3342), sink.getConfig().get(HConstants
+      .ZOOKEEPER_CLIENT_PORT));
+  }
 
+  @Test (expected = FlumeException.class)
+  public void testZKQuorumIncorrectPorts() throws Exception{
+    String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " +
+      "zk3.flume.apache.org:3342";
+    ctx.put("batchSize", "2");
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
+      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    HBaseSink sink = new HBaseSink();
+    Configurables.configure(sink, ctx);
+    Assert.fail();
+  }
+}
\ No newline at end of file