You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/06 02:27:30 UTC
git commit: FLUME-2134. AsyncHbase Sink should use
ZKConfig.getZKQuorumServersString plus test fixes on Windows
Updated Branches:
refs/heads/trunk 41f1e8afb -> 5d49eeb73
FLUME-2134. AsyncHbase Sink should use ZKConfig.getZKQuorumServersString plus test fixes on Windows
(Roshan Naik via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5d49eeb7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5d49eeb7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5d49eeb7
Branch: refs/heads/trunk
Commit: 5d49eeb734d1c55aaf48219699706851d9e820b5
Parents: 41f1e8a
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Aug 5 17:25:53 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Aug 5 17:25:53 2013 -0700
----------------------------------------------------------------------
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 3 +-
.../flume/sink/hbase/TestAsyncHBaseSink.java | 83 ++------------------
2 files changed, 8 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 7020fcd..5e297b1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -37,6 +37,7 @@ import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.PutRequest;
@@ -318,7 +319,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
if (conf == null) { //In tests, we pass the conf in.
conf = HBaseConfiguration.create();
}
- zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ zkQuorum = ZKConfig.getZKQuorumServersString(conf);
zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 7ddfdae..a0c04eb 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -19,14 +19,11 @@
package org.apache.flume.sink.hbase;
-
-import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -37,35 +34,26 @@ import org.apache.flume.Sink.Status;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import com.google.common.io.Files;
import com.google.common.primitives.Longs;
-import java.lang.reflect.Method;
import org.junit.After;
public class TestAsyncHBaseSink {
- private static HBaseTestingUtility testUtility;
- private static MiniZooKeeperCluster zookeeperCluster;
- private static MiniHBaseCluster hbaseCluster;
- private static String workDir = Files.createTempDir().getAbsolutePath();
+ private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
private static String tableName = "TestHbaseSink";
private static String columnFamily = "TestColumnFamily";
@@ -78,65 +66,8 @@ public class TestAsyncHBaseSink {
@BeforeClass
public static void setUp() throws Exception {
+ testUtility.startMiniCluster();
- /*
- * Borrowed from HCatalog ManyMiniCluster.java
- * https://svn.apache.org/repos/asf/incubator/hcatalog/trunk/
- * storage-handlers/hbase/src/test/org/apache/hcatalog/
- * hbase/ManyMiniCluster.java
- *
- */
- String hbaseDir = new File(workDir,"hbase").getAbsolutePath();
- String hbaseRoot = "file://" + hbaseDir;
- Configuration hbaseConf = HBaseConfiguration.create();
-
- hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
- hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
- hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
- hbaseConf.setInt("hbase.master.info.port", -1);
- hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns",500);
- String zookeeperDir = new File(workDir,"zk").getAbsolutePath();
- int zookeeperPort = 2181;
- zookeeperCluster = new MiniZooKeeperCluster();
- Method m;
- Class<?> zkParam[] = {Integer.TYPE};
- try{
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
- zkParam);
- } catch (NoSuchMethodException e) {
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
- zkParam);
- }
-
- m.invoke(zookeeperCluster, new Object[] {new Integer(zookeeperPort)});
- zookeeperCluster.startup(new File(zookeeperDir));
- hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
- HMaster master = hbaseCluster.getMaster();
- Object serverName = master.getServerName();
- String hostAndPort;
- if(serverName instanceof String) {
- System.out.println("Server name is string, using HServerAddress.");
- m = HMaster.class.getDeclaredMethod("getMasterAddress",
- new Class<?>[]{});
- Class<?> clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress");
- /*
- * Call method to get server address
- */
- Object serverAddr = clazz.cast(m.invoke(master, new Object[]{}));
- //returns the address as hostname:port
- hostAndPort = serverAddr.toString();
- } else {
- System.out.println("ServerName is org.apache.hadoop.hbase.ServerName," +
- "using getHostAndPort()");
- Class<?> clazz = Class.forName("org.apache.hadoop.hbase.ServerName");
- m = clazz.getDeclaredMethod("getHostAndPort", new Class<?>[] {});
- hostAndPort = m.invoke(serverName, new Object[]{}).toString();
- }
-
- hbaseConf.set("hbase.master", hostAndPort);
- testUtility = new HBaseTestingUtility(hbaseConf);
- testUtility.setZkCluster(zookeeperCluster);
- hbaseCluster.startMaster();
Map<String, String> ctxMap = new HashMap<String, String>();
ctxMap.put("table", tableName);
ctxMap.put("columnFamily", columnFamily);
@@ -151,9 +82,7 @@ public class TestAsyncHBaseSink {
@AfterClass
public static void tearDown() throws Exception {
- hbaseCluster.shutdown();
- zookeeperCluster.shutdown();
- FileUtils.deleteDirectory(new File(workDir));
+ testUtility.shutdownMiniCluster();
}
@After
@@ -347,7 +276,7 @@ public class TestAsyncHBaseSink {
deleteTable = true;
ctx.put("batchSize", "2");
ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
- testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+ ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
AsyncHBaseSink sink = new AsyncHBaseSink();
@@ -485,7 +414,7 @@ public class TestAsyncHBaseSink {
Assert.assertEquals(2, found);
out = results[2];
Assert.assertArrayEquals(Longs.toByteArray(2), out);
- hbaseCluster.shutdown();
+ testUtility.shutdownMiniCluster();
sink.process();
sink.stop();
}