You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/01/29 03:50:39 UTC

svn commit: r1562342 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/Import.java test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Author: tedyu
Date: Wed Jan 29 02:50:38 2014
New Revision: 1562342

URL: http://svn.apache.org/r1562342
Log:
HBASE-10416 Improvements to the import flow


Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1562342&r1=1562341&r2=1562342&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Jan 29 02:50:38 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.zookeeper.KeeperException;
 
+
 /**
  * Import data written by {@link Export}.
  */
@@ -65,40 +68,38 @@ import org.apache.zookeeper.KeeperExcept
 public class Import {
   private static final Log LOG = LogFactory.getLog(Import.class);
   final static String NAME = "import";
-  final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
-  final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
-  final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
-  final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
-
-  // Optional filter to use for mappers
-  private static Filter filter;
+  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
+  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+  public final static String TABLE_NAME = "import.table.name";
+  public final static String WAL_DURABILITY = "import.wal.durability";
 
   /**
    * A mapper that just writes out KeyValues.
    */
-  static class KeyValueImporter
-  extends TableMapper<ImmutableBytesWritable, KeyValue> {
+  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
     private Map<byte[], byte[]> cfRenameMap;
-      
+    private Filter filter;
     /**
      * @param row  The current table row key.
      * @param value  The columns.
      * @param context  The current context.
      * @throws IOException When something is broken with the data.
-     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
-     *   org.apache.hadoop.mapreduce.Mapper.Context)
      */
     @Override
     public void map(ImmutableBytesWritable row, Result value,
       Context context)
     throws IOException {
       try {
-        for (Cell kv : value.rawCells()) {
-          kv = filterKv(kv);
-          // skip if we filtered it out
-          if (kv == null) continue;
-          // TODO get rid of ensureKeyValue
-          context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+        if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
+          for (Cell kv : value.rawCells()) {
+            kv = filterKv(filter, kv);
+            // skip if we filtered it out
+            if (kv == null) continue;
+            // TODO get rid of ensureKeyValue
+            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+          }
         }
       } catch (InterruptedException e) {
         e.printStackTrace();
@@ -115,18 +116,17 @@ public class Import {
   /**
    * Write table content out to files in hdfs.
    */
-  static class Importer
-  extends TableMapper<ImmutableBytesWritable, Mutation> {
+  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
     private List<UUID> clusterIds;
+    private Filter filter;
+    private Durability durability;
 
     /**
      * @param row  The current table row key.
      * @param value  The columns.
      * @param context  The current context.
      * @throws IOException When something is broken with the data.
-     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
-     *   org.apache.hadoop.mapreduce.Mapper.Context)
      */
     @Override
     public void map(ImmutableBytesWritable row, Result value,
@@ -143,32 +143,40 @@ public class Import {
     throws IOException, InterruptedException {
       Put put = null;
       Delete delete = null;
-      for (Cell kv : result.rawCells()) {
-        kv = filterKv(kv);
-        // skip if we filter it out
-        if (kv == null) continue;
-
-        kv = convertKv(kv, cfRenameMap);
-        // Deletes and Puts are gathered and written when finished
-        if (CellUtil.isDelete(kv)) {
-          if (delete == null) {
-            delete = new Delete(key.get());
+      if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
+        for (Cell kv : result.rawCells()) {
+          kv = filterKv(filter, kv);
+          // skip if we filter it out
+          if (kv == null) continue;
+
+          kv = convertKv(kv, cfRenameMap);
+          // Deletes and Puts are gathered and written when finished
+          if (CellUtil.isDelete(kv)) {
+            if (delete == null) {
+              delete = new Delete(key.get());
+            }
+            delete.addDeleteMarker(kv);
+          } else {
+            if (put == null) {
+              put = new Put(key.get());
+            }
+            put.add(kv);
           }
-          delete.addDeleteMarker(kv);
-        } else {
-          if (put == null) { 
-            put = new Put(key.get());
+        }
+        if (put != null) {
+          if (durability != null) {
+            put.setDurability(durability);
           }
-          put.add(kv);
+          put.setClusterIds(clusterIds);
+          context.write(key, put);
+        }
+        if (delete != null) {
+          if (durability != null) {
+            delete.setDurability(durability);
+          }
+          delete.setClusterIds(clusterIds);
+          context.write(key, delete);
         }
-      }
-      if (put != null) {
-        put.setClusterIds(clusterIds);
-        context.write(key, put);
-      }
-      if (delete != null) {
-        delete.setClusterIds(clusterIds);
-        context.write(key, delete);
       }
     }
 
@@ -177,6 +185,10 @@ public class Import {
       Configuration conf = context.getConfiguration();
       cfRenameMap = createCfRenameMap(conf);
       filter = instantiateFilter(conf);
+      String durabilityStr = conf.get(WAL_DURABILITY);
+      if(durabilityStr != null){
+        durability = Durability.valueOf(durabilityStr.toUpperCase());
+      }
       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
       ZooKeeperWatcher zkw = null;
       try {
@@ -201,18 +213,19 @@ public class Import {
    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
    * @throws IllegalArgumentException if the filter is misconfigured
    */
-  private static Filter instantiateFilter(Configuration conf) {
-    // get the filter, if it was configured
+  public static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured    
     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
     if (filterClass == null) {
       LOG.debug("No configured filter class, accepting all keyvalues.");
       return null;
     }
     LOG.debug("Attempting to create filter:" + filterClass);
-
+    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
     try {
       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
-      return (Filter) m.invoke(null, getFilterArgs(conf));
+      return (Filter) m.invoke(null, quotedArgs);
     } catch (IllegalAccessException e) {
       LOG.error("Couldn't instantiate filter!", e);
       throw new RuntimeException(e);
@@ -231,15 +244,14 @@ public class Import {
     }
   }
 
-  private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
-    ArrayList<byte[]> args = new ArrayList<byte[]>();
-    String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
-    for (String arg : sargs) {
+  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
+    ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
+    for (String stringArg : stringArgs) {
       // all the filters' instantiation methods expected quoted args since they are coming from
-      // the shell, so add them here, though its shouldn't really be needed :-/
-      args.add(Bytes.toBytes("'" + arg + "'"));
+      // the shell, so add them here, though it shouldn't really be needed :-/
+      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
     }
-    return args;
+    return quotedArgs;
   }
 
   /**
@@ -248,7 +260,7 @@ public class Import {
    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
    *         {@link KeyValue}
    */
-  private static Cell filterKv(Cell kv) throws IOException {
+  public static Cell filterKv(Filter filter, Cell kv) throws IOException {
     // apply the filter and skip this kv if the filter doesn't apply
     if (filter != null) {
       Filter.ReturnCode code = filter.filterKeyValue(kv);
@@ -347,22 +359,12 @@ public class Import {
    * Add a Filter to be instantiated on import
    * @param conf Configuration to update (will be passed to the job)
    * @param clazz {@link Filter} subclass to instantiate on the server.
-   * @param args List of arguments to pass to the filter on instantiation
+   * @param filterArgs List of arguments to pass to the filter on instantiation
    */
   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
-      List<String> args) {
+      List<String> filterArgs) throws IOException {
     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
-
-    // build the param string for the key
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < args.size(); i++) {
-      String arg = args.get(i);
-      builder.append(arg);
-      if (i != args.size() - 1) {
-        builder.append(",");
-      }
-    }
-    conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
   }
 
   /**
@@ -375,6 +377,7 @@ public class Import {
   public static Job createSubmittableJob(Configuration conf, String[] args)
   throws IOException {
     String tableName = args[0];
+    conf.set(TABLE_NAME, tableName);
     Path inputDir = new Path(args[1]);
     Job job = new Job(conf, NAME + "_" + tableName);
     job.setJarByClass(Importer.class);
@@ -430,12 +433,42 @@ public class Import {
     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
         + CF_RENAME_PROP + " property. Futher, filters will only use the"
-        + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
-        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
-        + "the KeyValue.");
+        + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
+        + " whether the current row needs to be ignored completely for processing and "
+        + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+        + " the KeyValue.");
     System.err.println("For performance consider the following options:\n"
         + "  -Dmapred.map.tasks.speculative.execution=false\n"
-        + "  -Dmapred.reduce.tasks.speculative.execution=false");
+        + "  -Dmapred.reduce.tasks.speculative.execution=false\n"
+        + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
+            +" Allowed values are the supported durability values"
+            +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
+  }
+
+  /**
+   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
+   * need to flush all the regions of the table as the data is held in memory and is also not
+   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
+   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
+   */
+  public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
+      InterruptedException {
+    String tableName = conf.get(TABLE_NAME);
+    HBaseAdmin hAdmin = null;
+    String durability = conf.get(WAL_DURABILITY);
+    // Need to flush if the data is written to hbase and skip wal is enabled.
+    if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
+        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
+      try {
+        hAdmin = new HBaseAdmin(conf);
+        hAdmin.flush(tableName);
+      } finally {
+        if (hAdmin != null) {
+          hAdmin.close();
+        }
+      }
+    }
   }
 
   /**
@@ -456,6 +489,11 @@ public class Import {
       conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
     }
     Job job = createSubmittableJob(conf, otherArgs);
+    boolean isJobSuccessful = job.waitForCompletion(true);
+    if(isJobSuccessful){
+      // Flush all the regions of the table
+      flushRegionsIfNecessary(conf);
+    }
     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
 }

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1562342&r1=1562341&r2=1562342&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Wed Jan 29 02:50:38 2014
@@ -40,11 +40,13 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -56,6 +58,10 @@ import org.apache.hadoop.hbase.filter.Fi
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
 import org.apache.hadoop.mapreduce.Job;
@@ -353,6 +359,13 @@ public class TestImportExport {
     p.add(FAMILYA, QUAL, now + 4, QUAL);
     exportTable.put(p);
 
+    // Having another row would actually test the filter.
+    p = new Put(ROW2);
+    p.add(FAMILYA, QUAL, now, QUAL);
+    exportTable.put(p);
+    // Flush the commits.
+    exportTable.flushCommits();
+
     // Export the simple table
     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
     assertTrue(runExport(args));
@@ -512,7 +525,7 @@ public class TestImportExport {
    * parameters into Configuration
    */
   @Test
-  public void testAddFilterAndArguments() {
+  public void testAddFilterAndArguments() throws IOException {
     Configuration configuration = new Configuration();
 
     List<String> args = new ArrayList<String>();
@@ -524,4 +537,120 @@ public class TestImportExport {
         configuration.get(Import.FILTER_CLASS_CONF_KEY));
     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
   }
-}
+
+  @Test
+  public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
+    // Create an export table.
+    String exportTableName = "exporttestDurability";
+    HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
+
+    // Insert some data
+    Put put = new Put(ROW1);
+    put.add(FAMILYA, QUAL, now, QUAL);
+    put.add(FAMILYA, QUAL, now + 1, QUAL);
+    put.add(FAMILYA, QUAL, now + 2, QUAL);
+    exportTable.put(put);
+
+    put = new Put(ROW2);
+    put.add(FAMILYA, QUAL, now, QUAL);
+    put.add(FAMILYA, QUAL, now + 1, QUAL);
+    put.add(FAMILYA, QUAL, now + 2, QUAL);
+    exportTable.put(put);
+
+    // Run the export
+    String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
+    assertTrue(runExport(args));
+
+    // Create the table for import
+    String importTableName = "importTestDurability1";
+    HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
+
+    // Register the hlog listener for the import table
+    TableWALActionListener walListener = new TableWALActionListener(importTableName);
+    HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
+    hLog.registerWALActionsListener(walListener);
+
+    // Run the import with SKIP_WAL
+    args =
+        new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
+            importTableName, FQ_OUTPUT_DIR };
+    assertTrue(runImport(args));
+    //Assert that the wal is not visisted
+    assertTrue(!walListener.isWALVisited());
+    //Ensure that the count is 2 (only one version of key value is obtained)
+    assertTrue(getCount(importTable, null) == 2);
+
+    // Run the import with the default durability option
+    importTableName = "importTestDurability2";
+    importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
+    hLog.unregisterWALActionsListener(walListener);    
+    walListener = new TableWALActionListener(importTableName);
+    hLog.registerWALActionsListener(walListener);
+    args = new String[] { importTableName, FQ_OUTPUT_DIR };
+    assertTrue(runImport(args));
+    //Assert that the wal is visisted
+    assertTrue(walListener.isWALVisited());
+    //Ensure that the count is 2 (only one version of key value is obtained)
+    assertTrue(getCount(importTable, null) == 2);
+  }
+
+  /**
+   * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
+   * identify that an entry is written to the Write Ahead Log for the given table.
+   */
+  private static class TableWALActionListener implements WALActionsListener {
+
+    private String tableName;
+    private boolean isVisited = false;
+
+    public TableWALActionListener(String tableName) {
+      this.tableName = tableName;
+    }
+
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void logRollRequested() {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void logCloseRequested() {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
+      // Not interested in this method.
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+      if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
+        isVisited = true;
+      }
+    }
+
+    public boolean isWALVisited() {
+      return isVisited;
+    }
+  }  
+}
\ No newline at end of file