You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/06 02:27:30 UTC

git commit: FLUME-2134. AsyncHbase Sink should use ZKConfig.getZKQuorumServersString plus test fixes on Windows

Updated Branches:
  refs/heads/trunk 41f1e8afb -> 5d49eeb73


FLUME-2134. AsyncHbase Sink should use ZKConfig.getZKQuorumServersString plus test fixes on Windows

(Roshan Naik via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5d49eeb7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5d49eeb7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5d49eeb7

Branch: refs/heads/trunk
Commit: 5d49eeb734d1c55aaf48219699706851d9e820b5
Parents: 41f1e8a
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Aug 5 17:25:53 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Aug 5 17:25:53 2013 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/AsyncHBaseSink.java |  3 +-
 .../flume/sink/hbase/TestAsyncHBaseSink.java    | 83 ++------------------
 2 files changed, 8 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 7020fcd..5e297b1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -37,6 +37,7 @@ import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.hbase.async.AtomicIncrementRequest;
 import org.hbase.async.HBaseClient;
 import org.hbase.async.PutRequest;
@@ -318,7 +319,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       if (conf == null) { //In tests, we pass the conf in.
         conf = HBaseConfiguration.create();
       }
-      zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+      zkQuorum = ZKConfig.getZKQuorumServersString(conf);
       zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/5d49eeb7/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 7ddfdae..a0c04eb 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -19,14 +19,11 @@
 
 package org.apache.flume.sink.hbase;
 
-
-import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -37,35 +34,26 @@ import org.apache.flume.Sink.Status;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.common.io.Files;
 import com.google.common.primitives.Longs;
-import java.lang.reflect.Method;
 
 import org.junit.After;
 
 public class TestAsyncHBaseSink {
-  private static HBaseTestingUtility testUtility;
-  private static MiniZooKeeperCluster zookeeperCluster;
-  private static MiniHBaseCluster hbaseCluster;
-  private static String workDir = Files.createTempDir().getAbsolutePath();
+  private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
 
   private static String tableName = "TestHbaseSink";
   private static String columnFamily = "TestColumnFamily";
@@ -78,65 +66,8 @@ public class TestAsyncHBaseSink {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    testUtility.startMiniCluster();
 
-    /*
-     * Borrowed from HCatalog ManyMiniCluster.java
-     * https://svn.apache.org/repos/asf/incubator/hcatalog/trunk/
-     * storage-handlers/hbase/src/test/org/apache/hcatalog/
-     * hbase/ManyMiniCluster.java
-     *
-     */
-    String hbaseDir = new File(workDir,"hbase").getAbsolutePath();
-    String hbaseRoot = "file://" + hbaseDir;
-    Configuration hbaseConf =  HBaseConfiguration.create();
-
-    hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
-    hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
-    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
-    hbaseConf.setInt("hbase.master.info.port", -1);
-    hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns",500);
-    String zookeeperDir = new File(workDir,"zk").getAbsolutePath();
-    int zookeeperPort = 2181;
-    zookeeperCluster = new MiniZooKeeperCluster();
-    Method m;
-    Class<?> zkParam[] = {Integer.TYPE};
-    try{
-      m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
-          zkParam);
-    } catch (NoSuchMethodException e) {
-      m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
-          zkParam);
-    }
-
-    m.invoke(zookeeperCluster, new Object[] {new Integer(zookeeperPort)});
-    zookeeperCluster.startup(new File(zookeeperDir));
-    hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
-    HMaster master = hbaseCluster.getMaster();
-    Object serverName = master.getServerName();
-    String hostAndPort;
-    if(serverName instanceof String) {
-      System.out.println("Server name is string, using HServerAddress.");
-      m = HMaster.class.getDeclaredMethod("getMasterAddress",
-          new Class<?>[]{});
-      Class<?> clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress");
-      /*
-       * Call method to get server address
-       */
-      Object serverAddr = clazz.cast(m.invoke(master, new Object[]{}));
-      //returns the address as hostname:port
-      hostAndPort = serverAddr.toString();
-    } else {
-      System.out.println("ServerName is org.apache.hadoop.hbase.ServerName," +
-          "using getHostAndPort()");
-      Class<?> clazz = Class.forName("org.apache.hadoop.hbase.ServerName");
-      m = clazz.getDeclaredMethod("getHostAndPort", new Class<?>[] {});
-      hostAndPort = m.invoke(serverName, new Object[]{}).toString();
-    }
-
-    hbaseConf.set("hbase.master", hostAndPort);
-    testUtility = new HBaseTestingUtility(hbaseConf);
-    testUtility.setZkCluster(zookeeperCluster);
-    hbaseCluster.startMaster();
     Map<String, String> ctxMap = new HashMap<String, String>();
     ctxMap.put("table", tableName);
     ctxMap.put("columnFamily", columnFamily);
@@ -151,9 +82,7 @@ public class TestAsyncHBaseSink {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    hbaseCluster.shutdown();
-    zookeeperCluster.shutdown();
-    FileUtils.deleteDirectory(new File(workDir));
+    testUtility.shutdownMiniCluster();
   }
 
   @After
@@ -347,7 +276,7 @@ public class TestAsyncHBaseSink {
     deleteTable = true;
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
-        testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+            ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
       testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     AsyncHBaseSink sink = new AsyncHBaseSink();
@@ -485,7 +414,7 @@ public class TestAsyncHBaseSink {
     Assert.assertEquals(2, found);
     out = results[2];
     Assert.assertArrayEquals(Longs.toByteArray(2), out);
-    hbaseCluster.shutdown();
+    testUtility.shutdownMiniCluster();
     sink.process();
     sink.stop();
   }