You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/19 18:22:25 UTC
svn commit: r1328032 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Author: tedyu
Date: Thu Apr 19 16:22:24 2012
New Revision: 1328032
URL: http://svn.apache.org/viewvc?rev=1328032&view=rev
Log:
HBASE-5741 ImportTsv does not check for table existence (Himanshu)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1328032&r1=1328031&r2=1328032&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Thu Apr 19 16:22:24 2012
@@ -23,11 +23,16 @@ import org.apache.hadoop.hbase.util.Base
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -61,6 +66,7 @@ public class ImportTsv {
final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
final static String DEFAULT_SEPARATOR = "\t";
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
+ private static HBaseAdmin hbaseAdmin;
static class TsvParser {
/**
@@ -217,6 +223,9 @@ public class ImportTsv {
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
+ if (!doesTableExist(tableName)) {
+ createTable(conf, tableName);
+ }
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
@@ -237,6 +246,27 @@ public class ImportTsv {
return job;
}
+ private static boolean doesTableExist(String tableName) throws IOException {
+ return hbaseAdmin.tableExists(tableName.getBytes());
+ }
+
+ private static void createTable(Configuration conf, String tableName)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
+ String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
+ Set<String> cfSet = new HashSet<String>();
+ for (String aColumn : columns) {
+ if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
+ // we are only concerned with the first one (in case this is a cf:cq)
+ cfSet.add(aColumn.split(":", 2)[0]);
+ }
+ for (String cf : cfSet) {
+ HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
+ htd.addFamily(hcd);
+ }
+ hbaseAdmin.createTable(htd);
+ }
+
/*
* @param errorMsg Error message. Can be null.
*/
@@ -275,6 +305,14 @@ public class ImportTsv {
}
/**
+ * Used only by test method
+ * @param conf
+ */
+ static void createHbaseAdmin(Configuration conf) throws IOException {
+ hbaseAdmin = new HBaseAdmin(conf);
+ }
+
+ /**
* Main entry point.
*
* @param args The command line parameters.
@@ -311,7 +349,7 @@ public class ImportTsv {
usage("One or more columns in addition to the row key are required");
System.exit(-1);
}
-
+ hbaseAdmin = new HBaseAdmin(conf);
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1328032&r1=1328031&r2=1328032&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Thu Apr 19 16:22:24 2012
@@ -204,11 +204,13 @@ public class TestImportTsv {
final byte[] TAB = Bytes.toBytes(tableName);
final byte[] QA = Bytes.toBytes("A");
final byte[] QB = Bytes.toBytes("B");
-
- HTableDescriptor desc = new HTableDescriptor(TAB);
- desc.addFamily(new HColumnDescriptor(FAM));
- new HBaseAdmin(conf).createTable(desc);
-
+ if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
+ HTableDescriptor desc = new HTableDescriptor(TAB);
+ desc.addFamily(new HColumnDescriptor(FAM));
+ new HBaseAdmin(conf).createTable(desc);
+ } else { // set the hbaseAdmin as we are not going through main()
+ ImportTsv.createHbaseAdmin(conf);
+ }
Job job = ImportTsv.createSubmittableJob(conf, args);
job.waitForCompletion(false);
assertTrue(job.isSuccessful());
@@ -255,6 +257,21 @@ public class TestImportTsv {
}
}
+ @Test
+ public void testBulkOutputWithoutAnExistingTable() throws Exception {
+ String TABLE_NAME = "TestTable";
+ String FAMILY = "FAM";
+ String INPUT_FILE = "InputFile2.esv";
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
+ INPUT_FILE };
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+ }
+
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
return new String(bytes, HConstants.UTF8_ENCODING);
}