You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/01/29 03:50:39 UTC
svn commit: r1562342 - in /hbase/branches/0.98/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/Import.java
test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Author: tedyu
Date: Wed Jan 29 02:50:38 2014
New Revision: 1562342
URL: http://svn.apache.org/r1562342
Log:
HBASE-10416 Improvements to the import flow
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1562342&r1=1562341&r2=1562342&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Jan 29 02:50:38 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.zookeeper.KeeperException;
+
/**
* Import data written by {@link Export}.
*/
@@ -65,40 +68,38 @@ import org.apache.zookeeper.KeeperExcept
public class Import {
private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
- final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
- final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
- final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
- final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
-
- // Optional filter to use for mappers
- private static Filter filter;
+ public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
+ public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+ public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+ public final static String TABLE_NAME = "import.table.name";
+ public final static String WAL_DURABILITY = "import.wal.durability";
/**
* A mapper that just writes out KeyValues.
*/
- static class KeyValueImporter
- extends TableMapper<ImmutableBytesWritable, KeyValue> {
+ public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
private Map<byte[], byte[]> cfRenameMap;
-
+ private Filter filter;
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
- * org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result value,
Context context)
throws IOException {
try {
- for (Cell kv : value.rawCells()) {
- kv = filterKv(kv);
- // skip if we filtered it out
- if (kv == null) continue;
- // TODO get rid of ensureKeyValue
- context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+ if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
+ for (Cell kv : value.rawCells()) {
+ kv = filterKv(filter, kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+ // TODO get rid of ensureKeyValue
+ context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+ }
}
} catch (InterruptedException e) {
e.printStackTrace();
@@ -115,18 +116,17 @@ public class Import {
/**
* Write table content out to files in hdfs.
*/
- static class Importer
- extends TableMapper<ImmutableBytesWritable, Mutation> {
+ public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
private List<UUID> clusterIds;
+ private Filter filter;
+ private Durability durability;
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
- * org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result value,
@@ -143,32 +143,40 @@ public class Import {
throws IOException, InterruptedException {
Put put = null;
Delete delete = null;
- for (Cell kv : result.rawCells()) {
- kv = filterKv(kv);
- // skip if we filter it out
- if (kv == null) continue;
-
- kv = convertKv(kv, cfRenameMap);
- // Deletes and Puts are gathered and written when finished
- if (CellUtil.isDelete(kv)) {
- if (delete == null) {
- delete = new Delete(key.get());
+ if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
+ for (Cell kv : result.rawCells()) {
+ kv = filterKv(filter, kv);
+ // skip if we filter it out
+ if (kv == null) continue;
+
+ kv = convertKv(kv, cfRenameMap);
+ // Deletes and Puts are gathered and written when finished
+ if (CellUtil.isDelete(kv)) {
+ if (delete == null) {
+ delete = new Delete(key.get());
+ }
+ delete.addDeleteMarker(kv);
+ } else {
+ if (put == null) {
+ put = new Put(key.get());
+ }
+ put.add(kv);
}
- delete.addDeleteMarker(kv);
- } else {
- if (put == null) {
- put = new Put(key.get());
+ }
+ if (put != null) {
+ if (durability != null) {
+ put.setDurability(durability);
}
- put.add(kv);
+ put.setClusterIds(clusterIds);
+ context.write(key, put);
+ }
+ if (delete != null) {
+ if (durability != null) {
+ delete.setDurability(durability);
+ }
+ delete.setClusterIds(clusterIds);
+ context.write(key, delete);
}
- }
- if (put != null) {
- put.setClusterIds(clusterIds);
- context.write(key, put);
- }
- if (delete != null) {
- delete.setClusterIds(clusterIds);
- context.write(key, delete);
}
}
@@ -177,6 +185,10 @@ public class Import {
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
+ String durabilityStr = conf.get(WAL_DURABILITY);
+ if(durabilityStr != null){
+ durability = Durability.valueOf(durabilityStr.toUpperCase());
+ }
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
ZooKeeperWatcher zkw = null;
try {
@@ -201,18 +213,19 @@ public class Import {
* @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
* @throws IllegalArgumentException if the filter is misconfigured
*/
- private static Filter instantiateFilter(Configuration conf) {
- // get the filter, if it was configured
+ public static Filter instantiateFilter(Configuration conf) {
+ // get the filter, if it was configured
Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filterClass == null) {
LOG.debug("No configured filter class, accepting all keyvalues.");
return null;
}
LOG.debug("Attempting to create filter:" + filterClass);
-
+ String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+ ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
try {
Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
- return (Filter) m.invoke(null, getFilterArgs(conf));
+ return (Filter) m.invoke(null, quotedArgs);
} catch (IllegalAccessException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
@@ -231,15 +244,14 @@ public class Import {
}
}
- private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
- ArrayList<byte[]> args = new ArrayList<byte[]>();
- String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
- for (String arg : sargs) {
+ private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
+ ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
+ for (String stringArg : stringArgs) {
// all the filters' instantiation methods expected quoted args since they are coming from
- // the shell, so add them here, though its shouldn't really be needed :-/
- args.add(Bytes.toBytes("'" + arg + "'"));
+ // the shell, so add them here, though it shouldn't really be needed :-/
+ quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
}
- return args;
+ return quotedArgs;
}
/**
@@ -248,7 +260,7 @@ public class Import {
* @return <tt>null</tt> if the key should not be written, otherwise returns the original
* {@link KeyValue}
*/
- private static Cell filterKv(Cell kv) throws IOException {
+ public static Cell filterKv(Filter filter, Cell kv) throws IOException {
// apply the filter and skip this kv if the filter doesn't apply
if (filter != null) {
Filter.ReturnCode code = filter.filterKeyValue(kv);
@@ -347,22 +359,12 @@ public class Import {
* Add a Filter to be instantiated on import
* @param conf Configuration to update (will be passed to the job)
* @param clazz {@link Filter} subclass to instantiate on the server.
- * @param args List of arguments to pass to the filter on instantiation
+ * @param filterArgs List of arguments to pass to the filter on instantiation
*/
public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
- List<String> args) {
+ List<String> filterArgs) throws IOException {
conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
-
- // build the param string for the key
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < args.size(); i++) {
- String arg = args.get(i);
- builder.append(arg);
- if (i != args.size() - 1) {
- builder.append(",");
- }
- }
- conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+ conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
}
/**
@@ -375,6 +377,7 @@ public class Import {
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
+ conf.set(TABLE_NAME, tableName);
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Importer.class);
@@ -430,12 +433,42 @@ public class Import {
System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ CF_RENAME_PROP + " property. Futher, filters will only use the"
- + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
- + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
- + "the KeyValue.");
+ + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
+ + " whether the current row needs to be ignored completely for processing and "
+ + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+ + " the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
- + " -Dmapred.reduce.tasks.speculative.execution=false");
+ + " -Dmapred.reduce.tasks.speculative.execution=false\n"
+ + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
+ +" Allowed values are the supported durability values"
+ +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
+ }
+
+ /**
+ * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
+ * need to flush all the regions of the table as the data is held in memory and is also not
+ * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
+ * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
+ */
+ public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
+ InterruptedException {
+ String tableName = conf.get(TABLE_NAME);
+ HBaseAdmin hAdmin = null;
+ String durability = conf.get(WAL_DURABILITY);
+ // Need to flush if the data is written to hbase and skip wal is enabled.
+ if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
+ && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
+ try {
+ hAdmin = new HBaseAdmin(conf);
+ hAdmin.flush(tableName);
+ } finally {
+ if (hAdmin != null) {
+ hAdmin.close();
+ }
+ }
+ }
}
/**
@@ -456,6 +489,11 @@ public class Import {
conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
}
Job job = createSubmittableJob(conf, otherArgs);
+ boolean isJobSuccessful = job.waitForCompletion(true);
+ if(isJobSuccessful){
+ // Flush all the regions of the table
+ flushRegionsIfNecessary(conf);
+ }
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1562342&r1=1562341&r2=1562342&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Wed Jan 29 02:50:38 2014
@@ -40,11 +40,13 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -56,6 +58,10 @@ import org.apache.hadoop.hbase.filter.Fi
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
@@ -353,6 +359,13 @@ public class TestImportExport {
p.add(FAMILYA, QUAL, now + 4, QUAL);
exportTable.put(p);
+ // Having another row would actually test the filter.
+ p = new Put(ROW2);
+ p.add(FAMILYA, QUAL, now, QUAL);
+ exportTable.put(p);
+ // Flush the commits.
+ exportTable.flushCommits();
+
// Export the simple table
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
assertTrue(runExport(args));
@@ -512,7 +525,7 @@ public class TestImportExport {
* parameters into Configuration
*/
@Test
- public void testAddFilterAndArguments() {
+ public void testAddFilterAndArguments() throws IOException {
Configuration configuration = new Configuration();
List<String> args = new ArrayList<String>();
@@ -524,4 +537,120 @@ public class TestImportExport {
configuration.get(Import.FILTER_CLASS_CONF_KEY));
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
}
-}
+
+ @Test
+ public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
+ // Create an export table.
+ String exportTableName = "exporttestDurability";
+ HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
+
+ // Insert some data
+ Put put = new Put(ROW1);
+ put.add(FAMILYA, QUAL, now, QUAL);
+ put.add(FAMILYA, QUAL, now + 1, QUAL);
+ put.add(FAMILYA, QUAL, now + 2, QUAL);
+ exportTable.put(put);
+
+ put = new Put(ROW2);
+ put.add(FAMILYA, QUAL, now, QUAL);
+ put.add(FAMILYA, QUAL, now + 1, QUAL);
+ put.add(FAMILYA, QUAL, now + 2, QUAL);
+ exportTable.put(put);
+
+ // Run the export
+ String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
+ assertTrue(runExport(args));
+
+ // Create the table for import
+ String importTableName = "importTestDurability1";
+ HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
+
+ // Register the hlog listener for the import table
+ TableWALActionListener walListener = new TableWALActionListener(importTableName);
+ HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
+ hLog.registerWALActionsListener(walListener);
+
+ // Run the import with SKIP_WAL
+ args =
+ new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
+ importTableName, FQ_OUTPUT_DIR };
+ assertTrue(runImport(args));
+ //Assert that the wal is not visisted
+ assertTrue(!walListener.isWALVisited());
+ //Ensure that the count is 2 (only one version of key value is obtained)
+ assertTrue(getCount(importTable, null) == 2);
+
+ // Run the import with the default durability option
+ importTableName = "importTestDurability2";
+ importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
+ hLog.unregisterWALActionsListener(walListener);
+ walListener = new TableWALActionListener(importTableName);
+ hLog.registerWALActionsListener(walListener);
+ args = new String[] { importTableName, FQ_OUTPUT_DIR };
+ assertTrue(runImport(args));
+ //Assert that the wal is visisted
+ assertTrue(walListener.isWALVisited());
+ //Ensure that the count is 2 (only one version of key value is obtained)
+ assertTrue(getCount(importTable, null) == 2);
+ }
+
+ /**
+ * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
+ * identify that an entry is written to the Write Ahead Log for the given table.
+ */
+ private static class TableWALActionListener implements WALActionsListener {
+
+ private String tableName;
+ private boolean isVisited = false;
+
+ public TableWALActionListener(String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void logRollRequested() {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void logCloseRequested() {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
+ // Not interested in this method.
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+ if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
+ isVisited = true;
+ }
+ }
+
+ public boolean isWALVisited() {
+ return isVisited;
+ }
+ }
+}
\ No newline at end of file