You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/08/31 07:02:44 UTC

svn commit: r1379291 - in /hbase/branches/0.92/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java

Author: tedyu
Date: Fri Aug 31 05:02:43 2012
New Revision: 1379291

URL: http://svn.apache.org/viewvc?rev=1379291&view=rev
Log:
HBASE-6239 [replication] ReplicationSink uses the ts of the first KV for the other KVs in the same row (J-D)


Modified:
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1379291&r1=1379290&r2=1379291&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Fri Aug 31 05:02:43 2012
@@ -135,16 +135,15 @@ public class ReplicationSink {
           }
           // With mini-batching, we need to expect multiple rows per edit
           byte[] lastKey = kvs.get(0).getRow();
-          Put put = new Put(kvs.get(0).getRow(),
-              kvs.get(0).getTimestamp());
+          Put put = new Put(kvs.get(0).getRow());
           put.setClusterId(entry.getKey().getClusterId());
           for (KeyValue kv : kvs) {
             if (!Bytes.equals(lastKey, kv.getRow())) {
               tableList.add(put);
-              put = new Put(kv.getRow(), kv.getTimestamp());
+              put = new Put(kv.getRow());
               put.setClusterId(entry.getKey().getClusterId());
             }
-            put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+            put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue());
             lastKey = kv.getRow();
           }
           tableList.add(put);

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java?rev=1379291&r1=1379290&r2=1379291&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java Fri Aug 31 05:02:43 2012
@@ -199,6 +199,35 @@ public class TestReplicationSink {
   }
 
   /**
+   * This test makes sure that all the KVs belonging to a single row in a WALEdit
+   * retain their individual timestamps
+   * @throws Exception
+   */
+  @Test
+  public void testTsMix() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[3];
+    long now = System.currentTimeMillis();
+    for(int i = 0; i < 3; i++) {
+      entries[i] = createEntry(TABLE_NAME1, 1, i, KeyValue.Type.Put, now+i);
+    }
+    // Kinda ugly, trying to merge all the entries into one
+    entries[0].getEdit().add(entries[1].getEdit().getKeyValues().get(0));
+    entries[0].getEdit().add(entries[2].getEdit().getKeyValues().get(0));
+    HLog.Entry[] entry = new HLog.Entry[1];
+    entry[0] = entries[0];
+    SINK.replicateEntries(entry);
+
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    Result row = scanRes.next();
+    for(int i = 0; i < 3; i++) {
+      LOG.info(row.raw()[i]);
+      LOG.info("Comparing " + (now+i) + " with " + row.raw()[i].getTimestamp());
+      assertEquals(now+i, row.raw()[i].getTimestamp());
+    }
+  }
+
+  /**
    * Puts are buffered, but this tests when a delete (not-buffered) is applied
    * before the actual Put that creates it.
    * @throws Exception
@@ -219,30 +248,37 @@ public class TestReplicationSink {
     assertEquals(0, res.size());
   }
 
-  private HLog.Entry createEntry(byte [] table, int row,  KeyValue.Type type) {
+  private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+    return createEntry(table, row, row, type, -1);
+  }
+
+  private HLog.Entry createEntry(byte [] table, int row, int qualifier, KeyValue.Type type, long ts) {
+    byte[] qual = Bytes.toBytes(qualifier);
     byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
     byte[] rowBytes = Bytes.toBytes(row);
-    // Just make sure we don't get the same ts for two consecutive rows with
-    // same key
-    try {
-      Thread.sleep(1);
-    } catch (InterruptedException e) {
-      LOG.info("Was interrupted while sleep, meh", e);
+    if (ts == -1) {
+      // Just make sure we don't get the same ts for two consecutive rows with
+      // same key
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException e) {
+        LOG.info("Was interrupted while sleep, meh", e);
+      }
+      ts = System.currentTimeMillis();
     }
-    final long now = System.currentTimeMillis();
     KeyValue kv = null;
     if(type.getCode() == KeyValue.Type.Put.getCode()) {
-      kv = new KeyValue(rowBytes, fam, fam, now,
+      kv = new KeyValue(rowBytes, fam, qual, ts,
           KeyValue.Type.Put, Bytes.toBytes(row));
     } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
-        kv = new KeyValue(rowBytes, fam, fam,
-            now, KeyValue.Type.DeleteColumn);
+        kv = new KeyValue(rowBytes, fam, qual,
+            ts, KeyValue.Type.DeleteColumn);
     } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
         kv = new KeyValue(rowBytes, fam, null,
-            now, KeyValue.Type.DeleteFamily);
+            ts, KeyValue.Type.DeleteFamily);
     }
 
-    HLogKey key = new HLogKey(table, table, now, now,
+    HLogKey key = new HLogKey(table, table, ts, ts,
         HConstants.DEFAULT_CLUSTER_ID);
 
     WALEdit edit = new WALEdit();