You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/12/27 02:46:16 UTC

svn commit: r1224852 - in /hbase/branches/0.92: CHANGES.txt src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java

Author: larsh
Date: Tue Dec 27 01:46:16 2011
New Revision: 1224852

URL: http://svn.apache.org/viewvc?rev=1224852&view=rev
Log:
HBASE-5096  Replication does not handle deletes correctly. (Lars H)

Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1224852&r1=1224851&r2=1224852&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Tue Dec 27 01:46:16 2011
@@ -499,6 +499,7 @@ Release 0.92.0 - Unreleased
    HBASE-5078  DistributedLogSplitter failing to split file because it has edits for
                lots of regions
    HBASE-5077  SplitLogWorker fails to let go of a task, kills the RS
+   HBASE-5096  Replication does not handle deletes correctly. (Lars H)
 
   TESTS
    HBASE-4492  TestRollingRestart fails intermittently

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1224852&r1=1224851&r2=1224852&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Dec 27 01:46:16 2011
@@ -22,9 +22,8 @@ package org.apache.hadoop.hbase.replicat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
@@ -109,11 +108,21 @@ public class ReplicationSink {
               kvs.get(0).getTimestamp(), null);
           delete.setClusterId(entry.getKey().getClusterId());
           for (KeyValue kv : kvs) {
-            if (kv.isDeleteFamily()) {
-              delete.deleteFamily(kv.getFamily());
-            } else if (!kv.isEmptyColumn()) {
-              delete.deleteColumn(kv.getFamily(),
-                  kv.getQualifier());
+            switch (Type.codeToType(kv.getType())) {
+            case DeleteFamily:
+              // family marker
+              delete.deleteFamily(kv.getFamily(), kv.getTimestamp());
+              break;
+            case DeleteColumn:
+              // column marker
+              delete.deleteColumns(kv.getFamily(), kv.getQualifier(),
+                  kv.getTimestamp());
+              break;
+            case Delete:
+              // version marker
+              delete.deleteColumn(kv.getFamily(), kv.getQualifier(),
+                  kv.getTimestamp());
+              break;
             }
           }
           delete(entry.getKey().getTablename(), delete);

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1224852&r1=1224851&r2=1224852&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Tue Dec 27 01:46:16 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -203,6 +204,94 @@ public class TestReplication {
   }
 
   /**
+   * Verify that version and column delete marker types are replicated
+   * correctly.
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testDeleteTypes() throws Exception {
+    LOG.info("testDeleteTypes");
+    final byte[] v1 = Bytes.toBytes("v1");
+    final byte[] v2 = Bytes.toBytes("v2");
+    final byte[] v3 = Bytes.toBytes("v3");
+    htable1 = new HTable(conf1, tableName);
+
+    long t = EnvironmentEdgeManager.currentTimeMillis();
+    // create three versions for "row"
+    Put put = new Put(row);
+    put.add(famName, row, t, v1);
+    htable1.put(put);
+
+    put = new Put(row);
+    put.add(famName, row, t+1, v2);
+    htable1.put(put);
+    
+    put = new Put(row);
+    put.add(famName, row, t+2, v3);
+    htable1.put(put);
+    
+    Get get = new Get(row);
+    get.setMaxVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() < 3) {
+        LOG.info("Rows not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.raw()[0].getValue(), v3);
+        assertArrayEquals(res.raw()[1].getValue(), v2);
+        assertArrayEquals(res.raw()[2].getValue(), v1);
+        break;
+      }
+    }
+    // place a version delete marker (delete last version)
+    Delete d = new Delete(row);
+    d.deleteColumn(famName, row, t);
+    htable1.delete(d);
+
+    get = new Get(row);
+    get.setMaxVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() > 2) {
+        LOG.info("Version not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.raw()[0].getValue(), v3);
+        assertArrayEquals(res.raw()[1].getValue(), v2);
+        break;
+      }
+    }
+
+    // place a column delete marker
+    d = new Delete(row);
+    d.deleteColumns(famName, row, t+2);
+    htable1.delete(d);
+
+    // now *both* of the remaining version should be deleted
+    // at the replica
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Rows not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
    * Add a row, check it's replicated, delete it, check's gone
    * @throws Exception
    */