You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2013/01/30 23:33:55 UTC

svn commit: r1440717 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/Import.java test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Author: jyates
Date: Wed Jan 30 22:33:55 2013
New Revision: 1440717

URL: http://svn.apache.org/viewvc?rev=1440717&view=rev
Log:
HBASE-7702: Add filtering to Import jobs

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1440717&r1=1440716&r2=1440717&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Jan 30 22:33:55 2013
@@ -19,20 +19,31 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,10 +61,15 @@ import org.apache.zookeeper.KeeperExcept
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Import {
+  private static final Log LOG = LogFactory.getLog(Import.class);
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
-  private static final Log LOG = LogFactory.getLog(Import.class);
+  final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+  // Optional filter to use for mappers
+  private static Filter filter;
 
   /**
    * A mapper that just writes out KeyValues.
@@ -76,6 +92,10 @@ public class Import {
     throws IOException {
       try {
         for (KeyValue kv : value.raw()) {
+          kv = filterKv(kv);
+          // skip if we filtered it out
+          if (kv == null) continue;
+
           context.write(row, convertKv(kv, cfRenameMap));
         }
       } catch (InterruptedException e) {
@@ -86,6 +106,7 @@ public class Import {
     @Override
     public void setup(Context context) {
       cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
     }
   }
 
@@ -121,6 +142,10 @@ public class Import {
       Put put = null;
       Delete delete = null;
       for (KeyValue kv : result.raw()) {
+        kv = filterKv(kv);
+        // skip if we filter it out
+        if (kv == null) continue;
+
         kv = convertKv(kv, cfRenameMap);
         // Deletes and Puts are gathered and written when finished
         if (kv.isDelete()) {
@@ -149,6 +174,8 @@ public class Import {
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       cfRenameMap = createCfRenameMap(conf);
+      filter = instantiateFilter(conf);
+
       try {
         HConnection connection = HConnectionManager.getConnection(conf);
         ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@@ -165,6 +192,77 @@ public class Import {
     }
   }
 
+  /**
+   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+   * optionally not include in the job output
+   * @param conf {@link Configuration} from which to load the filter
+   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+   * @throws IllegalArgumentException if the filter is misconfigured
+   */
+  private static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured
+    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+    if (filterClass == null) {
+      LOG.debug("No configured filter class, accepting all keyvalues.");
+      return null;
+    }
+    LOG.debug("Attempting to create filter:" + filterClass);
+
+    try {
+      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+      return (Filter) m.invoke(null, getFilterArgs(conf));
+    } catch (IllegalAccessException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+    ArrayList<byte[]> args = new ArrayList<byte[]>();
+    String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    for (String arg : sargs) {
+      // all the filters' instantiation methods expected quoted args since they are coming from
+      // the shell, so add them here, though its shouldn't really be needed :-/
+      args.add(Bytes.toBytes("'" + arg + "'"));
+    }
+    return args;
+  }
+
+  /**
+   * Attempt to filter out the keyvalue
+   * @param kv {@link KeyValue} on which to apply the filter
+   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+   *         {@link KeyValue}
+   */
+  private static KeyValue filterKv(KeyValue kv) {
+    // apply the filter and skip this kv if the filter doesn't apply
+    if (filter != null) {
+      Filter.ReturnCode code = filter.filterKeyValue(kv);
+      System.out.println("Filter returned:" + code);
+      // if its not an accept type, then skip this kv
+      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+        if (LOG.isDebugEnabled()) {
+          System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+        }
+        return null;
+      }
+    }
+    return kv;
+  }
+
   // helper: create a new KeyValue based on CF rename map
   private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
     if(cfRenameMap != null) {
@@ -244,13 +342,33 @@ public class Import {
     }
     conf.set(CF_RENAME_PROP, sb.toString());
   }
-  
+
+  /**
+   * Add a Filter to be instantiated on import
+   * @param conf Configuration to update (will be passed to the job)
+   * @param clazz {@link Filter} subclass to instantiate on the server.
+   * @param args List of arguments to pass to the filter on instantiation
+   */
+  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+      List<String> args) {
+    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+    // build the param string for the key
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < args.size(); i++) {
+      String arg = args.get(i);
+      builder.append(arg);
+      if (i != args.size() - 1) {
+        builder.append(",");
+      }
+    }
+    conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+  }
 
   /**
    * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
+   * @param conf The current configuration.
+   * @param args The command line parameters.
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
@@ -263,6 +381,17 @@ public class Import {
     FileInputFormat.setInputPaths(job, inputDir);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+    // make sure we get the filter in the jars
+    try {
+      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+      if (filter != null) {
+        TableMapReduceUtil.addDependencyJars(conf, filter);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
     if (hfileOutPath != null) {
       job.setMapperClass(KeyValueImporter.class);
       HTable table = new HTable(conf, tableName);
@@ -295,6 +424,15 @@ public class Import {
     System.err.println("By default Import will load data directly into HBase. To instead generate");
     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err
+        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+        + CF_RENAME_PROP + " property. Futher, filters will only use the"
+        + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+        + "the KeyValue.");
     System.err.println("For performance consider the following options:\n"
         + "  -Dmapred.map.tasks.speculative.execution=false\n"
         + "  -Dmapred.reduce.tasks.speculative.execution=false");

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1440717&r1=1440716&r2=1440717&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Wed Jan 30 22:33:55 2013
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,11 +39,14 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -322,4 +328,99 @@ public class TestImportExport {
     assertEquals(now, res[6].getTimestamp());
     t.close();
   }
+
+  @Test
+  public void testWithFilter() throws Exception {
+    String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
+    HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+    UTIL.getHBaseAdmin().createTable(desc);
+    HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
+
+    Put p = new Put(ROW1);
+    p.add(FAMILYA, QUAL, now, QUAL);
+    p.add(FAMILYA, QUAL, now + 1, QUAL);
+    p.add(FAMILYA, QUAL, now + 2, QUAL);
+    p.add(FAMILYA, QUAL, now + 3, QUAL);
+    p.add(FAMILYA, QUAL, now + 4, QUAL);
+    exportTable.put(p);
+
+    String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1000" };
+
+    GenericOptionsParser opts = new GenericOptionsParser(new Configuration(
+        cluster.getConfiguration()), args);
+    Configuration conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    Job job = Export.createSubmittableJob(conf, args);
+    job.getConfiguration().set("mapreduce.framework.name", "yarn");
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+    String IMPORT_TABLE = "importWithFilter";
+    desc = new HTableDescriptor(IMPORT_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+    UTIL.getHBaseAdmin().createTable(desc);
+
+    HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
+    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, OUTPUT_DIR,
+        "1000" };
+
+    opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    job = Import.createSubmittableJob(conf, args);
+    job.getConfiguration().set("mapreduce.framework.name", "yarn");
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+    // get the count of the source table for that time range
+    PrefixFilter filter = new PrefixFilter(ROW1);
+    int count = getCount(exportTable, filter);
+
+    Assert.assertEquals("Unexpected row count between export and import tables", count,
+      getCount(importTable, null));
+
+    // and then test that a broken command doesn't bork everything - easier here because we don't
+    // need to re-run the export job
+
+    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
+        OUTPUT_DIR, "1000" };
+
+    opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    job = Import.createSubmittableJob(conf, args);
+    job.getConfiguration().set("mapreduce.framework.name", "yarn");
+    job.waitForCompletion(false);
+    assertFalse("Job succeeedd, but it had a non-instantiable filter!", job.isSuccessful());
+
+    // cleanup
+    exportTable.close();
+    importTable.close();
+  }
+
+  /**
+   * Count the number of keyvalues in the specified table for the given timerange
+   * @param start
+   * @param end
+   * @param table
+   * @return
+   * @throws IOException
+   */
+  private int getCount(HTable table, Filter filter) throws IOException {
+    Scan scan = new Scan();
+    scan.setFilter(filter);
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      count += res.size();
+    }
+    results.close();
+    return count;
+  }
 }