You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2013/01/30 23:33:55 UTC
svn commit: r1440717 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/Import.java
test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Author: jyates
Date: Wed Jan 30 22:33:55 2013
New Revision: 1440717
URL: http://svn.apache.org/viewvc?rev=1440717&view=rev
Log:
HBASE-7702: Add filtering to Import jobs
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1440717&r1=1440716&r2=1440717&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Jan 30 22:33:55 2013
@@ -19,20 +19,31 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
@@ -50,10 +61,15 @@ import org.apache.zookeeper.KeeperExcept
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Import {
+ private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
- private static final Log LOG = LogFactory.getLog(Import.class);
+ final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+ final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+ // Optional filter to use for mappers
+ private static Filter filter;
/**
* A mapper that just writes out KeyValues.
@@ -76,6 +92,10 @@ public class Import {
throws IOException {
try {
for (KeyValue kv : value.raw()) {
+ kv = filterKv(kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+
context.write(row, convertKv(kv, cfRenameMap));
}
} catch (InterruptedException e) {
@@ -86,6 +106,7 @@ public class Import {
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
+ filter = instantiateFilter(context.getConfiguration());
}
}
@@ -121,6 +142,10 @@ public class Import {
Put put = null;
Delete delete = null;
for (KeyValue kv : result.raw()) {
+ kv = filterKv(kv);
+ // skip if we filter it out
+ if (kv == null) continue;
+
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (kv.isDelete()) {
@@ -149,6 +174,8 @@ public class Import {
public void setup(Context context) {
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
+ filter = instantiateFilter(conf);
+
try {
HConnection connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@@ -165,6 +192,77 @@ public class Import {
}
}
+ /**
+ * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+ * optionally not include in the job output
+ * @param conf {@link Configuration} from which to load the filter
+ * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+ * @throws IllegalArgumentException if the filter is misconfigured
+ */
+ private static Filter instantiateFilter(Configuration conf) {
+ // get the filter, if it was configured
+ Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filterClass == null) {
+ LOG.debug("No configured filter class, accepting all keyvalues.");
+ return null;
+ }
+ LOG.debug("Attempting to create filter:" + filterClass);
+
+ try {
+ Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+ return (Filter) m.invoke(null, getFilterArgs(conf));
+ } catch (IllegalAccessException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+ ArrayList<byte[]> args = new ArrayList<byte[]>();
+ String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+ for (String arg : sargs) {
+ // all the filters' instantiation methods expected quoted args since they are coming from
+ // the shell, so add them here, though its shouldn't really be needed :-/
+ args.add(Bytes.toBytes("'" + arg + "'"));
+ }
+ return args;
+ }
+
+ /**
+ * Attempt to filter out the keyvalue
+ * @param kv {@link KeyValue} on which to apply the filter
+ * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+ * {@link KeyValue}
+ */
+ private static KeyValue filterKv(KeyValue kv) {
+ // apply the filter and skip this kv if the filter doesn't apply
+ if (filter != null) {
+ Filter.ReturnCode code = filter.filterKeyValue(kv);
+ System.out.println("Filter returned:" + code);
+ // if its not an accept type, then skip this kv
+ if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+ .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+ if (LOG.isDebugEnabled()) {
+ System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+ }
+ return null;
+ }
+ }
+ return kv;
+ }
+
// helper: create a new KeyValue based on CF rename map
private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
if(cfRenameMap != null) {
@@ -244,13 +342,33 @@ public class Import {
}
conf.set(CF_RENAME_PROP, sb.toString());
}
-
+
+ /**
+ * Add a Filter to be instantiated on import
+ * @param conf Configuration to update (will be passed to the job)
+ * @param clazz {@link Filter} subclass to instantiate on the server.
+ * @param args List of arguments to pass to the filter on instantiation
+ */
+ public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+ List<String> args) {
+ conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+ // build the param string for the key
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < args.size(); i++) {
+ String arg = args.get(i);
+ builder.append(arg);
+ if (i != args.size() - 1) {
+ builder.append(",");
+ }
+ }
+ conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+ }
/**
* Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
+ * @param conf The current configuration.
+ * @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
@@ -263,6 +381,17 @@ public class Import {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+ // make sure we get the filter in the jars
+ try {
+ Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filter != null) {
+ TableMapReduceUtil.addDependencyJars(conf, filter);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName);
@@ -295,6 +424,15 @@ public class Import {
System.err.println("By default Import will load data directly into HBase. To instead generate");
System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err
+ .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+ System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+ System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+ System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ + CF_RENAME_PROP + " property. Futher, filters will only use the"
+ + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+ + "the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
+ " -Dmapred.reduce.tasks.speculative.execution=false");
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1440717&r1=1440716&r2=1440717&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Wed Jan 30 22:33:55 2013
@@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,11 +39,14 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -322,4 +328,99 @@ public class TestImportExport {
assertEquals(now, res[6].getTimestamp());
t.close();
}
+
+ @Test
+ public void testWithFilter() throws Exception {
+ String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
+ HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
+ desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+ UTIL.getHBaseAdmin().createTable(desc);
+ HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
+
+ Put p = new Put(ROW1);
+ p.add(FAMILYA, QUAL, now, QUAL);
+ p.add(FAMILYA, QUAL, now + 1, QUAL);
+ p.add(FAMILYA, QUAL, now + 2, QUAL);
+ p.add(FAMILYA, QUAL, now + 3, QUAL);
+ p.add(FAMILYA, QUAL, now + 4, QUAL);
+ exportTable.put(p);
+
+ String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1000" };
+
+ GenericOptionsParser opts = new GenericOptionsParser(new Configuration(
+ cluster.getConfiguration()), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ Job job = Export.createSubmittableJob(conf, args);
+ job.getConfiguration().set("mapreduce.framework.name", "yarn");
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+ String IMPORT_TABLE = "importWithFilter";
+ desc = new HTableDescriptor(IMPORT_TABLE);
+ desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+ UTIL.getHBaseAdmin().createTable(desc);
+
+ HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
+ args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
+ "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, OUTPUT_DIR,
+ "1000" };
+
+ opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ job = Import.createSubmittableJob(conf, args);
+ job.getConfiguration().set("mapreduce.framework.name", "yarn");
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+ // get the count of the source table for that time range
+ PrefixFilter filter = new PrefixFilter(ROW1);
+ int count = getCount(exportTable, filter);
+
+ Assert.assertEquals("Unexpected row count between export and import tables", count,
+ getCount(importTable, null));
+
+ // and then test that a broken command doesn't bork everything - easier here because we don't
+ // need to re-run the export job
+
+ args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
+ "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
+ OUTPUT_DIR, "1000" };
+
+ opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ job = Import.createSubmittableJob(conf, args);
+ job.getConfiguration().set("mapreduce.framework.name", "yarn");
+ job.waitForCompletion(false);
+ assertFalse("Job succeeedd, but it had a non-instantiable filter!", job.isSuccessful());
+
+ // cleanup
+ exportTable.close();
+ importTable.close();
+ }
+
+ /**
+ * Count the number of keyvalues in the specified table for the given timerange
+ * @param start
+ * @param end
+ * @param table
+ * @return
+ * @throws IOException
+ */
+ private int getCount(HTable table, Filter filter) throws IOException {
+ Scan scan = new Scan();
+ scan.setFilter(filter);
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count += res.size();
+ }
+ results.close();
+ return count;
+ }
}