You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2014/03/01 16:27:39 UTC

git commit: FLUME-2335: TestHBaseSink#testWithoutConfigurationObject() must delete the table at the end of the test

Repository: flume
Updated Branches:
  refs/heads/trunk 96f6b6284 -> ad5f286ea


FLUME-2335: TestHBaseSink#testWithoutConfigurationObject() must delete the table at the end of the test

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ad5f286e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ad5f286e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ad5f286e

Branch: refs/heads/trunk
Commit: ad5f286eafd0912e9db0b2c3d92d1dc408377b12
Parents: 96f6b62
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Mar 1 07:27:10 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Mar 1 07:27:10 2014 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/TestHBaseSink.java  | 46 +++++++++++---------
 1 file changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ad5f286e/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index cb7c6ea..20b7fe5 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -36,6 +36,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
@@ -59,6 +60,7 @@ public class TestHBaseSink {
   private static String plCol = "pCol";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
+  private static Configuration conf;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -71,6 +73,7 @@ public class TestHBaseSink {
     ctxMap.put("serializer.payloadColumn", plCol);
     ctxMap.put("serializer.incrementColumn", inColumn);
     ctx.putAll(ctxMap);
+    conf = new Configuration(testUtility.getConfiguration());
   }
 
   @AfterClass
@@ -92,7 +95,7 @@ public class TestHBaseSink {
     tmpctx.putAll(ctxMap);
 
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
@@ -108,7 +111,7 @@ public class TestHBaseSink {
 
     sink.process();
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 1);
     byte[] out = results[0];
     Assert.assertArrayEquals(e.getBody(), out);
@@ -120,7 +123,7 @@ public class TestHBaseSink {
   @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
@@ -136,7 +139,7 @@ public class TestHBaseSink {
 
     sink.process();
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 1);
     byte[] out = results[0];
     Assert.assertArrayEquals(e.getBody(), out);
@@ -149,7 +152,7 @@ public class TestHBaseSink {
   public void testThreeEvents() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     ctx.put("batchSize", "3");
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
@@ -165,7 +168,7 @@ public class TestHBaseSink {
     tx.close();
     sink.process();
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
@@ -187,7 +190,7 @@ public class TestHBaseSink {
   public void testMultipleBatches() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     ctx.put("batchSize", "2");
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
@@ -209,7 +212,7 @@ public class TestHBaseSink {
     }
     sink.stop();
     Assert.assertEquals(2, count);
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
@@ -230,7 +233,7 @@ public class TestHBaseSink {
   @Test(expected = FlumeException.class)
   public void testMissingTable() throws Exception {
     ctx.put("batchSize", "2");
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
@@ -247,7 +250,7 @@ public class TestHBaseSink {
     tx.commit();
     tx.close();
     sink.process();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
@@ -279,7 +282,7 @@ public class TestHBaseSink {
   public void testHBaseFailure() throws Exception {
     ctx.put("batchSize", "2");
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
@@ -296,7 +299,7 @@ public class TestHBaseSink {
     tx.commit();
     tx.close();
     sink.process();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
@@ -373,7 +376,7 @@ public class TestHBaseSink {
   public void testTransactionStateOnChannelException() throws Exception {
     ctx.put("batchSize", "1");
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
     Channel channel = spy(new MemoryChannel());
@@ -396,7 +399,7 @@ public class TestHBaseSink {
     doReturn(e).when(channel).take();
     sink.process();
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 1);
     byte[] out = results[0];
     Assert.assertArrayEquals(e.getBody(), out);
@@ -411,7 +414,7 @@ public class TestHBaseSink {
     ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
         "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
-    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
     Channel channel = new MemoryChannel();
@@ -434,7 +437,7 @@ public class TestHBaseSink {
     MockSimpleHbaseEventSerializer.throwException = false;
     sink.process();
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 1);
     byte[] out = results[0];
     Assert.assertArrayEquals(e.getBody(), out);
@@ -447,10 +450,10 @@ public class TestHBaseSink {
   public void testWithoutConfigurationObject() throws Exception{
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
-      ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) );
+      ZKConfig.getZKQuorumServersString(conf) );
     System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM));
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, ctx);
@@ -475,7 +478,7 @@ public class TestHBaseSink {
       status = sink.process();
     }
     sink.stop();
-    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
@@ -490,6 +493,7 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
@@ -499,7 +503,7 @@ public class TestHBaseSink {
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, ctx);
     Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
@@ -516,7 +520,7 @@ public class TestHBaseSink {
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, ctx);
     Assert.fail();