You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by rv...@apache.org on 2011/10/04 04:08:14 UTC
svn commit: r1178674 - in
/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest:
hbasesmoke/ smoke/ system/ util/
Author: rvs
Date: Tue Oct 4 02:08:13 2011
New Revision: 1178674
URL: http://svn.apache.org/viewvc?rev=1178674&view=rev
Log:
Updating HBase tests
Added:
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/IncrementalPELoad.java
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseCompression.groovy
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBasePigSmoke.groovy
- copied, changed from r1178673, incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbasesmoke/TestHbasePigSmoke.groovy
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseSmoke.java
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHFileOutputFormat.java
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestLoadIncrementalHFiles.java
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/TestLoadAndVerify.java
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/HBaseTestUtil.java
Removed:
incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbasesmoke/TestHbasePigSmoke.groovy
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/IncrementalPELoad.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/IncrementalPELoad.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/IncrementalPELoad.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/IncrementalPELoad.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.smoke;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class IncrementalPELoad extends Configured implements Tool {
+
+ /**
+ * Simple mapper that makes KeyValue output.
+ */
+ public static class RandomKVGeneratingMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
+
+ private static final int ROWSPERSPLIT = 1024;
+ private static final byte[][] FAMILIES
+ = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+
+ private int keyLength;
+ private static final int KEYLEN_DEFAULT=10;
+ private static final String KEYLEN_CONF="randomkv.key.length";
+
+ private int valLength;
+ private static final int VALLEN_DEFAULT=10;
+ private static final String VALLEN_CONF="randomkv.val.length";
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ }
+
+ protected void map(NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable,
+ ImmutableBytesWritable,KeyValue>.Context context)
+ throws java.io.IOException ,InterruptedException
+ {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+ Random random = new Random();
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+ random.nextBytes(valBytes);
+ ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+ for (byte[] family : FAMILIES) {
+ KeyValue kv = new KeyValue(keyBytes, family,
+ PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ context.write(key, kv);
+ }
+ }
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf(), "testMRIncrementalLoad");
+ job.setJarByClass(IncrementalPELoad.class);
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(IncrementalPELoad.RandomKVGeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ HTable table = new HTable(Bytes.toBytes(args[0]));
+ HFileOutputFormat.configureIncrementalLoad(job, table);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new IncrementalPELoad(), args);
+ System.exit(exitCode);
+ }
+}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseCompression.groovy
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseCompression.groovy?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseCompression.groovy (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseCompression.groovy Tue Oct 4 02:08:13 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.smoke;
+
+import org.apache.hadoop.conf.Configuration
+
+import static org.junit.Assert.assertTrue
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+
+import org.apache.bigtop.itest.shell.Shell
+
+class TestHBaseCompression {
+ private static final String OUTPUT = "snappy-output";
+ private static final String TEST = "org.apache.hadoop.hbase.util.CompressionTest";
+ private static Configuration conf;
+ private static String HADOOP_OPTIONS;
+ private static String HDFS_PATH;
+ private static String JAVA_LIBRARY_PATH =
+ System.getProperty("snappy.lib.path", "");
+ private static Shell sh = new Shell('/bin/bash -s');
+
+ @BeforeClass
+ static void setUp() {
+ conf = new Configuration();
+ conf.addResource('mapred-site.xml');
+ HADOOP_OPTIONS =
+ "-fs ${conf.get('fs.default.name')} -jt ${conf.get('mapred.job.tracker')}";
+ sh.exec("whoami");
+ String user = sh.out[0];
+ HDFS_PATH = "${conf.get('fs.default.name')}/user/$user/$OUTPUT";
+ sh.exec("hadoop fs $HADOOP_OPTIONS -test -e $OUTPUT");
+ if (sh.getRet() == 0) {
+ sh.exec("hadoop fs $HADOOP_OPTIONS -rmr -skipTrash $OUTPUT");
+ assertTrue("Deletion of previous $OUTPUT from HDFS failed",
+ sh.getRet() == 0);
+ }
+ sh.exec("hadoop fs $HADOOP_OPTIONS -mkdir $OUTPUT");
+ assertTrue("Could not create $OUTPUT directory", sh.getRet() == 0);
+ }
+
+ @AfterClass
+ static void tearDown() {
+ sh.exec("hadoop fs $HADOOP_OPTIONS -test -e $OUTPUT");
+ if (sh.getRet() == 0) {
+ sh.exec("hadoop fs $HADOOP_OPTIONS -rmr -skipTrash $OUTPUT");
+ assertTrue("Deletion of $OUTPUT from HDFS failed",
+ sh.getRet() == 0);
+ }
+ }
+
+ void _testCompression(String codec) {
+ // workaround for hbase; set HBASE_LIBRARY_PATH
+ sh.exec("export HBASE_LIBRARY_PATH=$JAVA_LIBRARY_PATH",
+ "hbase $TEST $HDFS_PATH/testfile.$codec $codec");
+ assertTrue("test failed with codec: $codec", sh.getRet() == 0);
+ }
+
+ @Test
+ void testNoCompression() {
+ _testCompression("none");
+ }
+
+ @Test
+ void testGzipCompression() {
+ _testCompression("gz");
+ }
+
+ @Test
+ void testSnappyCompression() {
+ _testCompression("snappy");
+ }
+}
Copied: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBasePigSmoke.groovy (from r1178673, incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbasesmoke/TestHbasePigSmoke.groovy)
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBasePigSmoke.groovy?p2=incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBasePigSmoke.groovy&p1=incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbasesmoke/TestHbasePigSmoke.groovy&r1=1178673&r2=1178674&rev=1178674&view=diff
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbasesmoke/TestHbasePigSmoke.groovy (original)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBasePigSmoke.groovy Tue Oct 4 02:08:13 2011
@@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.bigtop.itest.hbasesmoke
+package org.apache.bigtop.itest.hbase.smoke
import org.apache.bigtop.itest.shell.Shell
import org.junit.AfterClass
@@ -25,19 +24,19 @@ import org.junit.Test
import static junit.framework.Assert.assertEquals
import static org.junit.Assert.assertTrue
-class TestHbasePigSmoke {
+class TestHBasePigSmoke {
private static final int ROW_CNT = 10;
- private static String extra_jars =
- System.getProperty("org.apache.bigtop.itest.hbasesmoke.TestHbasePigSmoke.extra_jars",
- "");
+ private static String extra_jars =
+ System.getProperty("org.apache.bigtop.itest.hbase.smoke.TestHBasePigSmoke.extra_jars",
+ "");
private static String register_clause = "";
- private static String tmp = "TestHbasePigSmoke-${(new Date().getTime())}";
+ private static String tmp = "TestHBasePigSmoke-${(new Date().getTime())}";
private static String TABLE="smoke-${tmp}";
private static String FAM1='family1';
private static String FAM2='family2';
- private static Shell shHbase = new Shell('hbase shell');
+ private static Shell shHBase = new Shell('hbase shell');
private static Shell shPig = new Shell('pig');
private static Shell sh = new Shell('/bin/bash -s');
@@ -49,16 +48,16 @@ class TestHbasePigSmoke {
@BeforeClass
static void setUp() {
- shHbase.exec("create '$TABLE', '$FAM1', '$FAM2'",
+ shHBase.exec("create '$TABLE', '$FAM1', '$FAM2'",
"describe '$TABLE'",
"quit\n");
assertEquals("Creating of the ${TABLE} failed",
- 0, shHbase.ret);
+ 0, shHBase.ret);
}
@AfterClass
static void tearDown() {
- shHbase.exec("disable '$TABLE'",
+ shHBase.exec("disable '$TABLE'",
"drop '$TABLE'",
"quit\n");
@@ -66,7 +65,7 @@ class TestHbasePigSmoke {
}
@Test(timeout=300000L)
- public void Pig2Hbase() {
+ public void Pig2HBase() {
def script = "\n";
(1..ROW_CNT).each { script <<= String.format('%020d %d %s\n', it, it, 'localhost') }
@@ -86,14 +85,14 @@ class TestHbasePigSmoke {
assertEquals("Failed loading data via PIG",
0, shPig.ret);
- shHbase.exec("scan '$TABLE'",
+ shHBase.exec("scan '$TABLE'",
"quit\n");
assertTrue("Scanning the table returned wrong # of rows",
- (shHbase.out.get(shHbase.out.size() - 3) =~ "^$ROW_CNT row.s. in .* seconds").find());
+ (shHBase.out.get(shHBase.out.size() - 3) =~ "^$ROW_CNT row.s. in .* seconds").find());
}
@Test(timeout=300000L)
- public void Hbase2Pig() {
+ public void HBase2Pig() {
def script = "\n";
(1..10).each {
@@ -102,7 +101,7 @@ class TestHbasePigSmoke {
}
script << "quit\n\n";
- shHbase.exec(script);
+ shHBase.exec(script);
shPig.exec("""
${register_clause}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseSmoke.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseSmoke.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseSmoke.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHBaseSmoke.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.smoke;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+public class TestHBaseSmoke {
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
+ private static final byte[] TEST_VALUE = Bytes.toBytes("v1");
+
+ private static int NUM_ROWS = 100;
+
+ /**
+ * Test case that creates a table, writes a small number of rows,
+ * disables the table, and exits.
+ */
+ @Test
+ public void testSimplePutGet() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ HTableDescriptor htd =
+ HBaseTestUtil.createTestTableDescriptor("testSimplePutGet", TEST_FAMILY);
+ admin.createTable(htd);
+
+ byte[] tableName = htd.getName();
+ try {
+ HTable table = new HTable(conf, tableName);
+ // Write some rows
+ for (int i = 0; i < NUM_ROWS; i++) {
+ byte[] row = Bytes.toBytes("row_" + i);
+ Put p = new Put(row);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ p.add(hcd.getName(), TEST_QUALIFIER, TEST_VALUE);
+ }
+ table.put(p);
+ }
+
+ table.flushCommits();
+
+ // Read some rows
+ for (int i = 0; i < NUM_ROWS; i++) {
+ byte[] row = Bytes.toBytes("row_" + i);
+ Get g = new Get(row);
+ Result result = table.get(g);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ byte[] value = result.getValue(hcd.getName(), TEST_QUALIFIER);
+ Assert.assertArrayEquals(TEST_VALUE, value);
+ }
+ }
+ } finally {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ }
+}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHFileOutputFormat.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHFileOutputFormat.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestHFileOutputFormat.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.smoke;
+
+import java.net.URL;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.JarContent;
+import org.apache.bigtop.itest.shell.Shell;
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+public class TestHFileOutputFormat {
+ private static final int ROWSPERSPLIT = 1024;
+
+ private static final byte[][] FAMILIES =
+ { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+
+ private static final String HBASE_HOME = System.getenv("HBASE_HOME");
+ private static final String HBASE_CONF_DIR = System.getenv("HBASE_CONF_DIR");
+ static {
+ assertNotNull("HBASE_HOME has to be set to run this test",
+ HBASE_HOME);
+ assertNotNull("HBASE_CONF_DIR has to be set to run this test",
+ HBASE_CONF_DIR);
+ }
+ private static String hbase_jar =
+ JarContent.getJarName(HBASE_HOME, "hbase-.*(?<!tests).jar");
+ private static String hbase_tests_jar =
+ JarContent.getJarName(HBASE_HOME, "hbase-.*tests.jar");
+ private static URL incrload_jar_url =
+ JarContent.getJarURL(org.apache.bigtop.itest.hbase.smoke.IncrementalPELoad.class);
+ static {
+ assertNotNull("Can't find hbase.jar", hbase_jar);
+ assertNotNull("Can't find hbase-tests.jar", hbase_tests_jar);
+ assertNotNull("Can't find jar containing IncrementalPELoad class", incrload_jar_url);
+ }
+ private static final String HBASE_JAR = HBASE_HOME + "/" + hbase_jar;
+ private static final String HBASE_TESTS_JAR = HBASE_HOME + "/" + hbase_tests_jar;
+ private static final String ZOOKEEPER_JAR = HBASE_HOME + "/lib/zookeeper.jar";
+ private static final String INCRLOAD_JAR = incrload_jar_url.getFile();
+ private static final String INCRLOAD = "org.apache.bigtop.itest.hbase.smoke.IncrementalPELoad";
+ private static final String USER = System.getProperty("user.name");
+ private static Shell sh = new Shell("/bin/bash -s");
+
+ @AfterClass
+ public static void cleanUp() {
+ // delete junk from HDFS
+ sh.exec("hadoop fs -rmr -skipTrash /user/" + USER + "/partitions_*");
+ assertTrue("HDFS cleanup failed", sh.getRet() == 0);
+ }
+
+ @Test
+ public void testMRIncrementalLoad() throws Exception {
+ doIncrementalLoadTest("testMRIncrementalLoad", false);
+ }
+
+ @Test
+ public void testMRIncrementalLoadWithSplit() throws Exception {
+ doIncrementalLoadTest("testMRIncrementalLoadWithSplit", true);
+ }
+
+ private byte [][] generateRandomSplitKeys(int numKeys) {
+ Random random = new Random();
+ byte[][] ret = new byte[numKeys][];
+ for (int i = 0; i < numKeys; i++) {
+ ret[i] = PerformanceEvaluation.generateValue(random);
+ }
+ return ret;
+ }
+
+ private void doIncrementalLoadTest(String testName, boolean shouldChangeRegions)
+ throws Exception {
+ FileSystem fs = HBaseTestUtil.getClusterFileSystem();
+ Path testDir = HBaseTestUtil.getMROutputDir(testName);
+ byte[][] splitKeys = generateRandomSplitKeys(4);
+
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ final byte[] TABLE_NAME = HBaseTestUtil.getTestTableName(testName);
+ HTableDescriptor tbldesc = new HTableDescriptor(TABLE_NAME);
+ HColumnDescriptor coldesc1 = new HColumnDescriptor(FAMILIES[0]);
+ HColumnDescriptor coldesc2 = new HColumnDescriptor(FAMILIES[1]);
+ tbldesc.addFamily(coldesc1);
+ tbldesc.addFamily(coldesc2);
+ admin.createTable(tbldesc, splitKeys);
+ HTable table = new HTable(TABLE_NAME);
+ assertEquals("Should start with empty table",
+ 0, HBaseTestUtil.countRows(table));
+
+ // Generate the bulk load files
+ runIncrementalPELoad(Bytes.toString(TABLE_NAME), testDir.toString());
+ // This doesn't write into the table, just makes files
+ assertEquals("HFOF should not touch actual table",
+ 0, HBaseTestUtil.countRows(table));
+
+ // Make sure that a directory was created for every CF
+ int dir = 0;
+ for (FileStatus f : fs.listStatus(testDir)) {
+ for (byte[] family : FAMILIES) {
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ ++dir;
+ }
+ }
+ }
+ assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+ // handle the split case
+ if (shouldChangeRegions) {
+ admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ byte[][] newSplitKeys = generateRandomSplitKeys(14);
+ admin.createTable(tbldesc, newSplitKeys);
+ }
+
+ // Before we can load the HFiles, we need to set the permissions so that
+ // HBase has write access to testDir's contents
+ chmod(testDir.toString());
+
+ // Perform the actual load
+ new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+
+ // Ensure data shows up
+ int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+ assertEquals("LoadIncrementalHFiles should put expected data in table",
+ expectedRows, HBaseTestUtil.countRows(table));
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count++;
+ assertEquals(FAMILIES.length, res.raw().length);
+ KeyValue first = res.raw()[0];
+ for (KeyValue kv : res.raw()) {
+ assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
+ assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
+ }
+ }
+ results.close();
+ String tableDigestBefore = HBaseTestUtil.checksumRows(table);
+
+ // Cause regions to reopen
+ admin.disableTable(TABLE_NAME);
+ admin.enableTable(TABLE_NAME);
+ assertEquals("Data should remain after reopening of regions",
+ tableDigestBefore, HBaseTestUtil.checksumRows(table));
+
+ // cleanup
+ // - disable and drop table
+ admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ // - remove incremental load output
+ fs.delete(testDir, true);
+ }
+
+ private void chmod(String uri) {
+ sh.exec("hadoop fs -chmod -R 777 " + uri);
+ assertEquals("chmod failed", 0, sh.getRet());
+ }
+
+ private void runIncrementalPELoad(String table, String outDir) {
+ sh.exec("export HADOOP_CLASSPATH=" + HBASE_CONF_DIR + ":" + HBASE_JAR + ":" + HBASE_TESTS_JAR + ":" + ZOOKEEPER_JAR,
+ "hadoop jar " + INCRLOAD_JAR + " " + INCRLOAD +
+ " -libjars " + HBASE_JAR + "," + HBASE_TESTS_JAR +
+ " " + table + " " + outDir);
+ assertEquals("MR job failed", 0, sh.getRet());
+ }
+
+}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestLoadIncrementalHFiles.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestLoadIncrementalHFiles.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/smoke/TestLoadIncrementalHFiles.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.smoke;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+public class TestLoadIncrementalHFiles {
+ private static final byte[] FAMILY = Bytes.toBytes("f1");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q1");
+ private static final byte[][] SPLIT_KEYS = new byte[][] {
+ Bytes.toBytes("ddd"),
+ Bytes.toBytes("ppp")
+ };
+
+ /**
+ * Test case that creates some regions and loads
+ * HFiles that fit snugly inside those regions
+ */
+ @Test
+ public void testSimpleLoad() throws Exception {
+ runTest("testSimpleLoad",
+ new byte[][][] {
+ new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+ });
+ }
+
+ /**
+ * Test case that creates some regions and loads
+ * HFiles that cross the boundaries of those regions
+ */
+ @Test
+ public void testRegionCrossingLoad() throws Exception {
+ runTest("testRegionCrossingLoad",
+ new byte[][][] {
+ new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+ new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
+ });
+ }
+
+ private void runTest(String testName, byte[][][] hfileRanges)
+ throws Exception {
+ FileSystem fs = HBaseTestUtil.getClusterFileSystem();
+ Path dir = HBaseTestUtil.getMROutputDir(testName);
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HBaseTestUtil.createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
+ FAMILY, QUALIFIER, from, to, 1000);
+ }
+ int expectedRows = hfileIdx * 1000;
+
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ final byte[] TABLE = HBaseTestUtil.getTestTableName(testName);
+ HTableDescriptor htd = new HTableDescriptor(TABLE);
+ htd.addFamily(new HColumnDescriptor(FAMILY));
+
+ admin.createTable(htd, SPLIT_KEYS);
+
+ HTable table = new HTable(conf, TABLE);
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ loader.doBulkLoad(dir, table);
+
+ Assert.assertEquals(expectedRows, HBaseTestUtil.countRows(table));
+
+ // disable and drop if we succeeded to verify
+ admin.disableTable(TABLE);
+ admin.deleteTable(TABLE);
+ fs.delete(dir, true);
+ }
+
+}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/TestLoadAndVerify.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/TestLoadAndVerify.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/TestLoadAndVerify.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/system/TestLoadAndVerify.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+public class TestLoadAndVerify {
+ private static final String TEST_NAME = "TestLoadAndVerify";
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
+
+ private static final String NUM_TO_WRITE_KEY =
+ "loadmapper.num_to_write";
+ private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
+
+ private static final String TABLE_NAME_KEY = "loadmapper.table";
+ private static final String TABLE_NAME_DEFAULT = "table";
+
+ private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
+ private static final int NUM_BACKREFS_DEFAULT = 50;
+
+ private static final int NUM_TASKS = 200;
+ private static final int NUM_REDUCE_TASKS = 35;
+
+ private static final int SCANNER_CACHING = 5000;
+
+ private enum Counters {
+ ROWS_WRITTEN,
+ REFERENCES_WRITTEN,
+ REFERENCES_CHECKED;
+ }
+
+ /**
+ * Converts a "long" value between endian systems.
+ * Borrowed from Apache Commons IO
+ * @param value value to convert
+ * @return the converted value
+ */
+ public static long swapLong(long value)
+ {
+ return
+ ( ( ( value >> 0 ) & 0xff ) << 56 ) +
+ ( ( ( value >> 8 ) & 0xff ) << 48 ) +
+ ( ( ( value >> 16 ) & 0xff ) << 40 ) +
+ ( ( ( value >> 24 ) & 0xff ) << 32 ) +
+ ( ( ( value >> 32 ) & 0xff ) << 24 ) +
+ ( ( ( value >> 40 ) & 0xff ) << 16 ) +
+ ( ( ( value >> 48 ) & 0xff ) << 8 ) +
+ ( ( ( value >> 56 ) & 0xff ) << 0 );
+ }
+
+ public static class LoadMapper
+ extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
+ {
+ private long recordsToWrite;
+ private HTable table;
+ private Configuration conf;
+ private int numBackReferencesPerRow;
+ private String shortTaskId;
+
+ private Random rand = new Random();
+
+ private Counter rowsWritten, refsWritten;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ conf = context.getConfiguration();
+ recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
+ String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
+ numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
+ table = new HTable(conf, tableName);
+ table.setWriteBufferSize(4*1024*1024);
+ table.setAutoFlush(false);
+
+ String taskId = conf.get("mapred.task.id");
+ Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
+ if (!matcher.matches()) {
+ throw new RuntimeException("Strange task ID: " + taskId);
+ }
+ shortTaskId = matcher.group(1);
+
+ rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
+ refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
+ }
+
+ @Override
+ public void cleanup(Context context) throws IOException {
+ table.flushCommits();
+ table.close();
+ }
+
+ @Override
+ protected void map(NullWritable key, NullWritable value,
+ Context context) throws IOException, InterruptedException {
+
+ String suffix = "/" + shortTaskId;
+ byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
+
+ int BLOCK_SIZE = (int)(recordsToWrite / 100);
+
+ for (long i = 0; i < recordsToWrite;) {
+ long blockStart = i;
+ for (long idxInBlock = 0;
+ idxInBlock < BLOCK_SIZE && i < recordsToWrite;
+ idxInBlock++, i++) {
+
+ long byteSwapped = swapLong(i);
+ Bytes.putLong(row, 0, byteSwapped);
+
+ Put p = new Put(row);
+ p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
+ if (blockStart > 0) {
+ for (int j = 0; j < numBackReferencesPerRow; j++) {
+ long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
+ Bytes.putLong(row, 0, swapLong(referredRow));
+ p.add(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
+ }
+ refsWritten.increment(1);
+ }
+ rowsWritten.increment(1);
+ table.put(p);
+
+ if (i % 100 == 0) {
+ context.setStatus("Written " + i + "/" + recordsToWrite + " records");
+ context.progress();
+ }
+ }
+ // End of block, flush all of them before we start writing anything
+ // pointing to these!
+ table.flushCommits();
+ }
+ }
+ }
+
+ public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
+ static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
+
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ BytesWritable bwKey = new BytesWritable(key.get());
+ BytesWritable bwVal = new BytesWritable();
+ for (KeyValue kv : value.list()) {
+ if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
+ kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
+ context.write(bwKey, EMPTY);
+ } else {
+ bwVal.set(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
+ context.write(bwVal, bwKey);
+ }
+ }
+ }
+ }
+
+ public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
+ private Counter refsChecked;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
+ }
+
+ @Override
+ protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
+ VerifyReducer.Context ctx) throws IOException, InterruptedException {
+ boolean gotOriginalRow = false;
+ int refCount = 0;
+
+ for (BytesWritable ref : referrers) {
+ if (ref.getLength() == 0) {
+ assert !gotOriginalRow;
+ gotOriginalRow = true;
+ } else {
+ refCount++;
+ }
+ }
+ refsChecked.increment(refCount);
+
+ if (!gotOriginalRow) {
+ String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
+ String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
+ ctx.write(new Text(binRow), new Text(parsedRow));
+ }
+ }
+
+ private String makeRowReadable(byte[] bytes, int length) {
+ long rowIdx = swapLong(Bytes.toLong(bytes, 0));
+ String suffix = Bytes.toString(bytes, 8, length - 8);
+
+ return "Row #" + rowIdx + " suffix " + suffix;
+ }
+ }
+
+ private void doLoad(HTableDescriptor htd) throws Exception {
+ Path outputDir =
+ new Path(HBaseTestUtil.getMROutputDir(TEST_NAME),
+ "load-output");
+
+ Configuration conf = HBaseConfiguration.create();
+ NMapInputFormat.setNumMapTasks(conf, NUM_TASKS);
+ conf.set(TABLE_NAME_KEY, htd.getNameAsString());
+
+ Job job = new Job(conf);
+ job.setJobName(TEST_NAME + " Load for " + htd.getNameAsString());
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(LoadMapper.class);
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(job, outputDir);
+
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJars(
+ job.getConfiguration(), HTable.class, Lists.class);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ private void doVerify(HTableDescriptor htd) throws Exception {
+ Path outputDir =
+ new Path(HBaseTestUtil.getMROutputDir(TEST_NAME),
+ "verify-output");
+
+ Configuration conf = HBaseConfiguration.create();
+ Job job = new Job(conf);
+ job.setJarByClass(this.getClass());
+ job.setJobName(TEST_NAME + " Verification for " + htd.getNameAsString());
+
+ Scan scan = new Scan();
+
+ TableMapReduceUtil.initTableMapperJob(
+ htd.getNameAsString(), scan, VerifyMapper.class,
+ BytesWritable.class, BytesWritable.class, job);
+ TableMapReduceUtil.setScannerCaching(job, SCANNER_CACHING);
+
+ job.setReducerClass(VerifyReducer.class);
+ job.setNumReduceTasks(NUM_REDUCE_TASKS);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ assertTrue(job.waitForCompletion(true));
+
+ long numOutputRecords = job.getCounters().findCounter(
+ org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS).getValue();
+ assertEquals(0, numOutputRecords);
+ }
+
+ @Test
+ public void testLoadAndVerify() throws Exception {
+ HTableDescriptor htd =
+ HBaseTestUtil.createTestTableDescriptor(TEST_NAME, TEST_FAMILY);
+ HBaseAdmin admin = HBaseTestUtil.getAdmin();
+ int numPreCreate = 40;
+ admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPreCreate);
+
+ doLoad(htd);
+ doVerify(htd);
+
+ // Only disable and drop if we succeeded to verify - otherwise it's useful
+ // to leave it around for post-mortem
+ admin.disableTable(htd.getName());
+ admin.deleteTable(htd.getName());
+ }
+}
Added: incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/HBaseTestUtil.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/HBaseTestUtil.java?rev=1178674&view=auto
==============================================================================
--- incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/HBaseTestUtil.java (added)
+++ incubator/bigtop/branches/hadoop-0.22/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/util/HBaseTestUtil.java Tue Oct 4 02:08:13 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.util;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBaseTestUtil {
+
+ public static int BLOCKSIZE = 64*1024;
+ public static String COMPRESSION =
+ Compression.Algorithm.NONE.getName();
+
+ private static String getTestPrefix() {
+ return String.valueOf(System.currentTimeMillis());
+ }
+
+ public static byte[] getTestTableName(String testName) {
+ return Bytes.toBytes(testName + "_" + getTestPrefix());
+ }
+
+ public static HTableDescriptor createTestTableDescriptor(String testName,
+ byte[] familyName) {
+ byte[] tableName = getTestTableName(testName);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(familyName));
+ return htd;
+ }
+
+ public static HBaseAdmin getAdmin()
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ return new HBaseAdmin(HBaseConfiguration.create());
+ }
+
+ public static FileSystem getClusterFileSystem() throws IOException {
+ return FileSystem.get(new Configuration());
+ }
+
+ public static Path getMROutputDir(String testName) throws IOException {
+ Path p = new Path("/tmp/" + testName + "_" + getTestPrefix());
+ return p.makeQualified(getClusterFileSystem());
+ }
+
+ /**
+ * Create an HFile with the given number of rows between a given
+ * start key and end key.
+ */
+ public static void createHFile(
+ FileSystem fs, Path path,
+ byte[] family, byte[] qualifier,
+ byte[] startKey, byte[] endKey, int numRows) throws IOException
+ {
+ HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
+ KeyValue.KEY_COMPARATOR);
+ long now = System.currentTimeMillis();
+ try {
+ // subtract 2 since iterateOnSplits doesn't include boundary keys
+ for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
+ KeyValue kv = new KeyValue(key, family, qualifier, now, key);
+ writer.append(kv);
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Return the number of rows in the given table.
+ */
+ public static int countRows(final HTable table) throws IOException {
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (@SuppressWarnings("unused") Result res : results) {
+ count++;
+ }
+ results.close();
+ return count;
+ }
+
+ /**
+ * Return an md5 digest of the entire contents of a table.
+ */
+ public static String checksumRows(final HTable table) throws Exception {
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ for (Result res : results) {
+ digest.update(res.getRow());
+ }
+ results.close();
+ return digest.toString();
+ }
+}