You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/08/31 07:02:44 UTC
svn commit: r1379291 - in /hbase/branches/0.92/src:
main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
Author: tedyu
Date: Fri Aug 31 05:02:43 2012
New Revision: 1379291
URL: http://svn.apache.org/viewvc?rev=1379291&view=rev
Log:
HBASE-6239 [replication] ReplicationSink uses the ts of the first KV for the other KVs in the same row (J-D)
Modified:
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1379291&r1=1379290&r2=1379291&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Fri Aug 31 05:02:43 2012
@@ -135,16 +135,15 @@ public class ReplicationSink {
}
// With mini-batching, we need to expect multiple rows per edit
byte[] lastKey = kvs.get(0).getRow();
- Put put = new Put(kvs.get(0).getRow(),
- kvs.get(0).getTimestamp());
+ Put put = new Put(kvs.get(0).getRow());
put.setClusterId(entry.getKey().getClusterId());
for (KeyValue kv : kvs) {
if (!Bytes.equals(lastKey, kv.getRow())) {
tableList.add(put);
- put = new Put(kv.getRow(), kv.getTimestamp());
+ put = new Put(kv.getRow());
put.setClusterId(entry.getKey().getClusterId());
}
- put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+ put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue());
lastKey = kv.getRow();
}
tableList.add(put);
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java?rev=1379291&r1=1379290&r2=1379291&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java Fri Aug 31 05:02:43 2012
@@ -199,6 +199,35 @@ public class TestReplicationSink {
}
/**
+ * This test makes sure that all the KVs belonging to a single row in a WALEdit
+ * retain their individual timestamps
+ * @throws Exception
+ */
+ @Test
+ public void testTsMix() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[3];
+ long now = System.currentTimeMillis();
+ for(int i = 0; i < 3; i++) {
+ entries[i] = createEntry(TABLE_NAME1, 1, i, KeyValue.Type.Put, now+i);
+ }
+ // Kinda ugly, trying to merge all the entries into one
+ entries[0].getEdit().add(entries[1].getEdit().getKeyValues().get(0));
+ entries[0].getEdit().add(entries[2].getEdit().getKeyValues().get(0));
+ HLog.Entry[] entry = new HLog.Entry[1];
+ entry[0] = entries[0];
+ SINK.replicateEntries(entry);
+
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ Result row = scanRes.next();
+ for(int i = 0; i < 3; i++) {
+ LOG.info(row.raw()[i]);
+ LOG.info("Comparing " + (now+i) + " with " + row.raw()[i].getTimestamp());
+ assertEquals(now+i, row.raw()[i].getTimestamp());
+ }
+ }
+
+ /**
* Puts are buffered, but this tests when a delete (not-buffered) is applied
* before the actual Put that creates it.
* @throws Exception
@@ -219,30 +248,37 @@ public class TestReplicationSink {
assertEquals(0, res.size());
}
- private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+ private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+ return createEntry(table, row, row, type, -1);
+ }
+
+ private HLog.Entry createEntry(byte [] table, int row, int qualifier, KeyValue.Type type, long ts) {
+ byte[] qual = Bytes.toBytes(qualifier);
byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
- // Just make sure we don't get the same ts for two consecutive rows with
- // same key
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- LOG.info("Was interrupted while sleep, meh", e);
+ if (ts == -1) {
+ // Just make sure we don't get the same ts for two consecutive rows with
+ // same key
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ LOG.info("Was interrupted while sleep, meh", e);
+ }
+ ts = System.currentTimeMillis();
}
- final long now = System.currentTimeMillis();
KeyValue kv = null;
if(type.getCode() == KeyValue.Type.Put.getCode()) {
- kv = new KeyValue(rowBytes, fam, fam, now,
+ kv = new KeyValue(rowBytes, fam, qual, ts,
KeyValue.Type.Put, Bytes.toBytes(row));
} else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
- kv = new KeyValue(rowBytes, fam, fam,
- now, KeyValue.Type.DeleteColumn);
+ kv = new KeyValue(rowBytes, fam, qual,
+ ts, KeyValue.Type.DeleteColumn);
} else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
kv = new KeyValue(rowBytes, fam, null,
- now, KeyValue.Type.DeleteFamily);
+ ts, KeyValue.Type.DeleteFamily);
}
- HLogKey key = new HLogKey(table, table, now, now,
+ HLogKey key = new HLogKey(table, table, ts, ts,
HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();