You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/12/09 00:16:56 UTC
svn commit: r1212181 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: larsh
Date: Thu Dec 8 23:16:55 2011
New Revision: 1212181
URL: http://svn.apache.org/viewvc?rev=1212181&view=rev
Log:
HBASE-4682 Support deleted rows using Import/Export (Lars H)
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java Thu Dec 8 23:16:55 2011
@@ -119,6 +119,25 @@ public class Delete extends Mutation
}
/**
+ * Advanced use only. Create a Delete object based on a KeyValue
+ * of type "delete".
+ * @param kv
+ * @throws IOException
+ */
+ public Delete(KeyValue kv) throws IOException {
+ this(kv.getRow(), kv.getTimestamp(), null);
+ if (!kv.isDelete()) {
+ throw new IOException("The recently added KeyValue is not of type "
+ + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
+ }
+ // can't use singletonList, because this might be modified at the server by
+ // coprocessors
+ ArrayList<KeyValue> list = new ArrayList<KeyValue>(1);
+ list.add(kv);
+ familyMap.put(kv.getFamily(), list);
+ }
+
+ /**
* Delete all versions of all columns of the specified family.
* <p>
* Overrides previous calls to deleteColumn and deleteColumns for the
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java Thu Dec 8 23:16:55 2011
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -165,6 +164,9 @@ public class Scan extends OperationWithA
addFamily(fam);
}
}
+ for (Map.Entry<String, byte[]> attr : scan.getAttributesMap().entrySet()) {
+ setAttribute(attr.getKey(), attr.getValue());
+ }
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Thu Dec 8 23:16:55 2011
@@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFac
public class Export {
private static final Log LOG = LogFactory.getLog(Export.class);
final static String NAME = "export";
+ final static String RAW_SCAN="hbase.mapreduce.include.deleted.rows";
/**
* Mapper.
@@ -115,6 +116,11 @@ public class Export {
// Set cache blocks
s.setCacheBlocks(false);
// Set Scan Column Family
+ boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
+ if (raw) {
+ s.setRaw(raw);
+ }
+
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
}
@@ -124,8 +130,8 @@ public class Export {
LOG.info("Setting Scan Filter for Export.");
s.setFilter(exportFilter);
}
- LOG.info("verisons=" + versions + ", starttime=" + startTime +
- ", endtime=" + endTime);
+ LOG.info("versions=" + versions + ", starttime=" + startTime +
+ ", endtime=" + endTime + ", keepDeletedCells=" + raw);
return s;
}
@@ -159,6 +165,7 @@ public class Export {
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to control/limit what is exported..");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
+ System.err.println(" -D " + RAW_SCAN + "=true");
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1212181&r1=1212180&r2=1212181&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Thu Dec 8 23:16:55 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -47,7 +49,7 @@ public class Import {
* Write table content out to files in hdfs.
*/
static class Importer
- extends TableMapper<ImmutableBytesWritable, Put> {
+ extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
/**
@@ -63,15 +65,15 @@ public class Import {
Context context)
throws IOException {
try {
- context.write(row, resultToPut(row, value));
+ writeResult(row, value, context);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- private Put resultToPut(ImmutableBytesWritable key, Result result)
- throws IOException {
- Put put = new Put(key.get());
+ private void writeResult(ImmutableBytesWritable key, Result result, Context context)
+ throws IOException, InterruptedException {
+ Put put = null;
for (KeyValue kv : result.raw()) {
if(cfRenameMap != null) {
// If there's a rename mapping for this CF, create a new KeyValue
@@ -93,11 +95,24 @@ public class Import {
kv.getValueLength()); // value length
}
}
- put.add(kv);
+ if (kv.isDelete()) {
+ // Deletes need to be written one-by-one,
+ // since family deletes overwrite column(s) deletes
+ context.write(key, new Delete(kv));
+ } else {
+ // Puts are gathered into a single Put object
+ // and written when finished
+ if (put == null) {
+ put = new Put(key.get());
+ }
+ put.add(kv);
+ }
+ }
+ if (put != null) {
+ context.write(key, put);
}
- return put;
}
-
+
@Override
public void setup(Context context) {
// Make a map from sourceCfName to destCfName by parsing a config key
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1212181&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Thu Dec 8 23:16:55 2011
@@ -0,0 +1,217 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+
+@Category(MediumTests.class)
+public class TestImportExport {
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final String FAMILYA_STRING = "a";
+ private static final String FAMILYB_STRING = "b";
+ private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
+ private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
+ private static final byte[] QUAL = Bytes.toBytes("q");
+ private static final String OUTPUT_DIR = "outputdir";
+
+ private static MiniHBaseCluster cluster;
+ private static long now = System.currentTimeMillis();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ cluster = UTIL.startMiniCluster();
+ UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniMapReduceCluster();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+ fs.delete(new Path(OUTPUT_DIR), true);
+ }
+
+ /**
+ * Test simple replication case with column mapping
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleCase() throws Exception {
+ String EXPORT_TABLE = "exportSimpleCase";
+ HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA);
+ Put p = new Put(ROW1);
+ p.add(FAMILYA, QUAL, now, QUAL);
+ p.add(FAMILYA, QUAL, now+1, QUAL);
+ p.add(FAMILYA, QUAL, now+2, QUAL);
+ t.put(p);
+ p = new Put(ROW2);
+ p.add(FAMILYA, QUAL, now, QUAL);
+ p.add(FAMILYA, QUAL, now+1, QUAL);
+ p.add(FAMILYA, QUAL, now+2, QUAL);
+ t.put(p);
+
+ String[] args = new String[] {
+ EXPORT_TABLE,
+ OUTPUT_DIR,
+ "1000"
+ };
+
+ GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ Job job = Export.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+
+ String IMPORT_TABLE = "importTableSimpleCase";
+ t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB);
+ args = new String[] {
+ "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
+ IMPORT_TABLE,
+ OUTPUT_DIR
+ };
+
+ opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ job = Import.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+ Get g = new Get(ROW1);
+ g.setMaxVersions();
+ Result r = t.get(g);
+ assertEquals(3, r.size());
+ g = new Get(ROW2);
+ g.setMaxVersions();
+ r = t.get(g);
+ assertEquals(3, r.size());
+ }
+
+ @Test
+ public void testWithDeletes() throws Exception {
+ String EXPORT_TABLE = "exportWithDeletes";
+ HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
+ desc.addFamily(new HColumnDescriptor(FAMILYA,
+ HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+ 5, /* versions */
+ true /* keep deleted cells */,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE,
+ HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HConstants.REPLICATION_SCOPE_LOCAL));
+ UTIL.getHBaseAdmin().createTable(desc);
+ HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
+
+ Put p = new Put(ROW1);
+ p.add(FAMILYA, QUAL, now, QUAL);
+ p.add(FAMILYA, QUAL, now+1, QUAL);
+ p.add(FAMILYA, QUAL, now+2, QUAL);
+ p.add(FAMILYA, QUAL, now+3, QUAL);
+ p.add(FAMILYA, QUAL, now+4, QUAL);
+ t.put(p);
+
+ Delete d = new Delete(ROW1, now+3, null);
+ t.delete(d);
+ d = new Delete(ROW1);
+ d.deleteColumns(FAMILYA, QUAL, now+2);
+ t.delete(d);
+
+ String[] args = new String[] {
+ "-D" + Export.RAW_SCAN + "=true",
+ EXPORT_TABLE,
+ OUTPUT_DIR,
+ "1000"
+ };
+
+ GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ Job job = Export.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+
+ String IMPORT_TABLE = "importWithDeletes";
+ desc = new HTableDescriptor(IMPORT_TABLE);
+ desc.addFamily(new HColumnDescriptor(FAMILYA,
+ HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+ 5, /* versions */
+ true /* keep deleted cells */,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE,
+ HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HConstants.REPLICATION_SCOPE_LOCAL));
+ UTIL.getHBaseAdmin().createTable(desc);
+ t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
+ args = new String[] {
+ IMPORT_TABLE,
+ OUTPUT_DIR
+ };
+
+ opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
+ conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ job = Import.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+ Scan s = new Scan();
+ s.setMaxVersions();
+ s.setRaw(true);
+ ResultScanner scanner = t.getScanner(s);
+ Result r = scanner.next();
+ KeyValue[] res = r.raw();
+ assertTrue(res[0].isDeleteFamily());
+ assertEquals(now+4, res[1].getTimestamp());
+ assertEquals(now+3, res[2].getTimestamp());
+ assertTrue(res[3].isDelete());
+ assertEquals(now+2, res[4].getTimestamp());
+ assertEquals(now+1, res[5].getTimestamp());
+ assertEquals(now, res[6].getTimestamp());
+ }
+}