You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/12/09 00:16:56 UTC

svn commit: r1212181 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: larsh
Date: Thu Dec  8 23:16:55 2011
New Revision: 1212181

URL: http://svn.apache.org/viewvc?rev=1212181&view=rev
Log:
HBASE-4682 Support deleted rows using Import/Export (Lars H)

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java Thu Dec  8 23:16:55 2011
@@ -119,6 +119,25 @@ public class Delete extends Mutation
   }
 
   /**
+   * Advanced use only. Create a Delete object based on a KeyValue
+   * of type "delete".
+   * @param kv
+   * @throws IOException
+   */
+  public Delete(KeyValue kv) throws IOException {
+    this(kv.getRow(), kv.getTimestamp(), null);
+    if (!kv.isDelete()) {
+      throw new IOException("The recently added KeyValue is not of type "
+          + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
+    }
+    // can't use singletonList, because this might be modified at the server by
+    // coprocessors
+    ArrayList<KeyValue> list = new ArrayList<KeyValue>(1);
+    list.add(kv);
+    familyMap.put(kv.getFamily(), list);
+  }
+
+  /**
    * Delete all versions of all columns of the specified family.
    * <p>
    * Overrides previous calls to deleteColumn and deleteColumns for the

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java Thu Dec  8 23:16:55 2011
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -165,6 +164,9 @@ public class Scan extends OperationWithA
         addFamily(fam);
       }
     }
+    for (Map.Entry<String, byte[]> attr : scan.getAttributesMap().entrySet()) {
+      setAttribute(attr.getKey(), attr.getValue());
+    }
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Thu Dec  8 23:16:55 2011
@@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFac
 public class Export {
   private static final Log LOG = LogFactory.getLog(Export.class);
   final static String NAME = "export";
+  final static String RAW_SCAN="hbase.mapreduce.include.deleted.rows";
 
   /**
    * Mapper.
@@ -115,6 +116,11 @@ public class Export {
     // Set cache blocks
     s.setCacheBlocks(false);
     // Set Scan Column Family
+    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
+    if (raw) {
+      s.setRaw(raw);
+    }
+    
     if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
       s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
     }
@@ -124,8 +130,8 @@ public class Export {
         LOG.info("Setting Scan Filter for Export.");
       s.setFilter(exportFilter);
     }
-    LOG.info("verisons=" + versions + ", starttime=" + startTime +
-      ", endtime=" + endTime);
+    LOG.info("versions=" + versions + ", starttime=" + startTime +
+      ", endtime=" + endTime + ", keepDeletedCells=" + raw);
     return s;
   }
 
@@ -159,6 +165,7 @@ public class Export {
     System.err.println("  Additionally, the following SCAN properties can be specified");
     System.err.println("  to control/limit what is exported..");
     System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
+    System.err.println("   -D " + RAW_SCAN + "=true");
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Thu Dec  8 23:16:55 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -47,7 +49,7 @@ public class Import {
    * Write table content out to files in hdfs.
    */
   static class Importer
-  extends TableMapper<ImmutableBytesWritable, Put> {
+  extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
       
     /**
@@ -63,15 +65,15 @@ public class Import {
       Context context)
     throws IOException {
       try {
-        context.write(row, resultToPut(row, value));
+        writeResult(row, value, context);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }
 
-    private Put resultToPut(ImmutableBytesWritable key, Result result)
-    throws IOException {
-      Put put = new Put(key.get());
+    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
+    throws IOException, InterruptedException {
+      Put put = null;
       for (KeyValue kv : result.raw()) {
         if(cfRenameMap != null) {
             // If there's a rename mapping for this CF, create a new KeyValue
@@ -93,11 +95,24 @@ public class Import {
                         kv.getValueLength());     // value length
             } 
         }
-        put.add(kv);
+        if (kv.isDelete()) {
+          // Deletes need to be written one-by-one,
+          // since family deletes overwrite column(s) deletes
+          context.write(key, new Delete(kv));
+        } else {
+          // Puts are gathered into a single Put object
+          // and written when finished
+          if (put == null) { 
+            put = new Put(key.get());
+          }
+          put.add(kv);
+        }
+      }
+      if (put != null) {
+        context.write(key, put);
       }
-      return put;
     }
-    
+
     @Override
     public void setup(Context context) {
       // Make a map from sourceCfName to destCfName by parsing a config key

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1212181&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Thu Dec  8 23:16:55 2011
@@ -0,0 +1,217 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+
+@Category(MediumTests.class)
+public class TestImportExport {
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final String FAMILYA_STRING = "a";
+  private static final String FAMILYB_STRING = "b";
+  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
+  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
+  private static final byte[] QUAL = Bytes.toBytes("q");
+  private static final String OUTPUT_DIR = "outputdir";
+
+  private static MiniHBaseCluster cluster;
+  private static long now = System.currentTimeMillis();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    cluster = UTIL.startMiniCluster();
+    UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniMapReduceCluster();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    fs.delete(new Path(OUTPUT_DIR), true);
+  }
+
+  /**
+   * Test simple replication case with column mapping
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleCase() throws Exception {
+    String EXPORT_TABLE = "exportSimpleCase";
+    HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA);
+    Put p = new Put(ROW1);
+    p.add(FAMILYA, QUAL, now, QUAL);
+    p.add(FAMILYA, QUAL, now+1, QUAL);
+    p.add(FAMILYA, QUAL, now+2, QUAL);
+    t.put(p);
+    p = new Put(ROW2);
+    p.add(FAMILYA, QUAL, now, QUAL);
+    p.add(FAMILYA, QUAL, now+1, QUAL);
+    p.add(FAMILYA, QUAL, now+2, QUAL);
+    t.put(p);
+
+    String[] args = new String[] {
+        EXPORT_TABLE,
+        OUTPUT_DIR,
+        "1000"
+    };
+
+    GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    Configuration conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    Job job = Export.createSubmittableJob(conf, args);
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+
+    String IMPORT_TABLE = "importTableSimpleCase";
+    t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB);
+    args = new String[] {
+        "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
+        IMPORT_TABLE,
+        OUTPUT_DIR
+    };
+
+    opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    job = Import.createSubmittableJob(conf, args);
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+    Get g = new Get(ROW1);
+    g.setMaxVersions();
+    Result r = t.get(g);
+    assertEquals(3, r.size());
+    g = new Get(ROW2);
+    g.setMaxVersions();
+    r = t.get(g);
+    assertEquals(3, r.size());
+  }
+
+  @Test
+  public void testWithDeletes() throws Exception {
+    String EXPORT_TABLE = "exportWithDeletes";
+    HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILYA,
+        HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+        5, /* versions */
+        true /* keep deleted cells */,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_IN_MEMORY,
+        HColumnDescriptor.DEFAULT_BLOCKCACHE,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE,
+        HColumnDescriptor.DEFAULT_TTL,
+        HColumnDescriptor.DEFAULT_BLOOMFILTER,
+        HConstants.REPLICATION_SCOPE_LOCAL));
+    UTIL.getHBaseAdmin().createTable(desc);
+    HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
+
+    Put p = new Put(ROW1);
+    p.add(FAMILYA, QUAL, now, QUAL);
+    p.add(FAMILYA, QUAL, now+1, QUAL);
+    p.add(FAMILYA, QUAL, now+2, QUAL);
+    p.add(FAMILYA, QUAL, now+3, QUAL);
+    p.add(FAMILYA, QUAL, now+4, QUAL);
+    t.put(p);
+
+    Delete d = new Delete(ROW1, now+3, null);
+    t.delete(d);
+    d = new Delete(ROW1);
+    d.deleteColumns(FAMILYA, QUAL, now+2);
+    t.delete(d);
+    
+    String[] args = new String[] {
+        "-D" + Export.RAW_SCAN + "=true",
+        EXPORT_TABLE,
+        OUTPUT_DIR,
+        "1000"
+    };
+
+    GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    Configuration conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    Job job = Export.createSubmittableJob(conf, args);
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+
+    String IMPORT_TABLE = "importWithDeletes";
+    desc = new HTableDescriptor(IMPORT_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILYA,
+        HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+        5, /* versions */
+        true /* keep deleted cells */,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_IN_MEMORY,
+        HColumnDescriptor.DEFAULT_BLOCKCACHE,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE,
+        HColumnDescriptor.DEFAULT_TTL,
+        HColumnDescriptor.DEFAULT_BLOOMFILTER,
+        HConstants.REPLICATION_SCOPE_LOCAL));
+    UTIL.getHBaseAdmin().createTable(desc);
+    t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
+    args = new String[] {
+        IMPORT_TABLE,
+        OUTPUT_DIR
+    };
+
+    opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+    conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    job = Import.createSubmittableJob(conf, args);
+    job.waitForCompletion(false);
+    assertTrue(job.isSuccessful());
+
+    Scan s = new Scan();
+    s.setMaxVersions();
+    s.setRaw(true);
+    ResultScanner scanner = t.getScanner(s);
+    Result r = scanner.next();
+    KeyValue[] res = r.raw();
+    assertTrue(res[0].isDeleteFamily());
+    assertEquals(now+4, res[1].getTimestamp());
+    assertEquals(now+3, res[2].getTimestamp());
+    assertTrue(res[3].isDelete());
+    assertEquals(now+2, res[4].getTimestamp());
+    assertEquals(now+1, res[5].getTimestamp());
+    assertEquals(now, res[6].getTimestamp());
+  }
+}