You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/08/13 10:03:53 UTC
hbase git commit: HBASE-14054 Acknowledged writes may get lost if
regionserver clock is set backwards
Repository: hbase
Updated Branches:
refs/heads/master a399ac9c4 -> d1262331e
HBASE-14054 Acknowledged writes may get lost if regionserver clock is set backwards
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d1262331
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d1262331
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d1262331
Branch: refs/heads/master
Commit: d1262331eb0481ecc128ce78590ca4ff992f94e7
Parents: a399ac9
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Aug 13 11:03:37 2015 +0300
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Aug 13 11:03:37 2015 +0300
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 38 +++++-
.../hadoop/hbase/regionserver/TestHRegion.java | 121 +++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1262331/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c0df4bf..37d0f08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -203,7 +203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
-
+
// in milliseconds
private static final String MAX_WAIT_FOR_SEQ_ID_KEY =
"hbase.hregion.max.wait.for.seq.id";
@@ -3288,13 +3288,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
boolean matches = false;
+ long cellTs = 0;
if (result.size() == 0 && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
valueIsNull) {
matches = true;
+ cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
+ cellTs = kv.getTimestamp();
int compareResult = CellComparator.compareValue(kv, comparator);
switch (compareOp) {
case LESS:
@@ -3321,6 +3324,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
//If matches put the new put or delete the new delete
if (matches) {
+ // We have acquired the row lock already. If the system clock is NOT monotonically
+ // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
+ // larger timestamp than what was observed via Get. doBatchMutate already does this, but
+ // there is no way to pass the cellTs. See HBASE-14054.
+ long now = EnvironmentEdgeManager.currentTime();
+ long ts = Math.max(now, cellTs); // ensure write is not eclipsed
+ byte[] byteTs = Bytes.toBytes(ts);
+
+ if (w instanceof Put) {
+ updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
+ }
+ // else delete is not needed since it already does a second get, and sets the timestamp
+ // from get (see prepareDeleteTimestamps).
+
// All edits for the given row (across all column families) must
// happen atomically.
doBatchMutate(w);
@@ -3367,13 +3384,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
boolean matches = false;
+ long cellTs = 0;
if (result.size() == 0 && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
valueIsNull) {
matches = true;
+ cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
+ cellTs = kv.getTimestamp();
int compareResult = CellComparator.compareValue(kv, comparator);
switch (compareOp) {
case LESS:
@@ -3400,6 +3420,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
//If matches put the new put or delete the new delete
if (matches) {
+ // We have acquired the row lock already. If the system clock is NOT monotonically
+ // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
+ // larger timestamp than what was observed via Get. doBatchMutate already does this, but
+ // there is no way to pass the cellTs. See HBASE-14054.
+ long now = EnvironmentEdgeManager.currentTime();
+ long ts = Math.max(now, cellTs); // ensure write is not eclipsed
+ byte[] byteTs = Bytes.toBytes(ts);
+
+ for (Mutation w : rm.getMutations()) {
+ if (w instanceof Put) {
+ updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
+ }
+ // else delete is not needed since it already does a second get, and sets the timestamp
+ // from get (see prepareDeleteTimestamps).
+ }
+
// All edits for the given row (across all column families) must
// happen atomically.
mutateRow(rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1262331/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 56a9d4b..826c9b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@@ -145,6 +146,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@@ -6240,6 +6242,125 @@ public class TestHRegion {
}
}
+ @Test
+ public void testIncrementTimestampsAreMonotonic() throws IOException {
+ HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
+ ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ edge.setValue(10);
+ Increment inc = new Increment(row);
+ inc.setDurability(Durability.SKIP_WAL);
+ inc.addColumn(fam1, qual1, 1L);
+ region.increment(inc);
+
+ Result result = region.get(new Get(row));
+ Cell c = result.getColumnLatestCell(fam1, qual1);
+ assertNotNull(c);
+ assertEquals(c.getTimestamp(), 10L);
+
+ edge.setValue(1); // clock goes back
+ region.increment(inc);
+ result = region.get(new Get(row));
+ c = result.getColumnLatestCell(fam1, qual1);
+ assertEquals(c.getTimestamp(), 10L);
+ assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L);
+ }
+
+ @Test
+ public void testAppendTimestampsAreMonotonic() throws IOException {
+ HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
+ ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ edge.setValue(10);
+ Append a = new Append(row);
+ a.setDurability(Durability.SKIP_WAL);
+ a.add(fam1, qual1, qual1);
+ region.append(a);
+
+ Result result = region.get(new Get(row));
+ Cell c = result.getColumnLatestCell(fam1, qual1);
+ assertNotNull(c);
+ assertEquals(c.getTimestamp(), 10L);
+
+ edge.setValue(1); // clock goes back
+ region.append(a);
+ result = region.get(new Get(row));
+ c = result.getColumnLatestCell(fam1, qual1);
+ assertEquals(c.getTimestamp(), 10L);
+
+ byte[] expected = new byte[qual1.length*2];
+ System.arraycopy(qual1, 0, expected, 0, qual1.length);
+ System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
+
+ assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
+ expected, 0, expected.length));
+ }
+
+ @Test
+ public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
+ HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
+ ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ edge.setValue(10);
+ Put p = new Put(row);
+ p.setDurability(Durability.SKIP_WAL);
+ p.addColumn(fam1, qual1, qual1);
+ region.put(p);
+
+ Result result = region.get(new Get(row));
+ Cell c = result.getColumnLatestCell(fam1, qual1);
+ assertNotNull(c);
+ assertEquals(c.getTimestamp(), 10L);
+
+ edge.setValue(1); // clock goes back
+ p = new Put(row);
+ p.setDurability(Durability.SKIP_WAL);
+ p.addColumn(fam1, qual1, qual2);
+ region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false);
+ result = region.get(new Get(row));
+ c = result.getColumnLatestCell(fam1, qual1);
+ assertEquals(c.getTimestamp(), 10L);
+
+ assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
+ qual2, 0, qual2.length));
+ }
+
+ @Test
+ public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
+ HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
+ ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ edge.setValue(10);
+ Put p = new Put(row);
+ p.setDurability(Durability.SKIP_WAL);
+ p.addColumn(fam1, qual1, qual1);
+ region.put(p);
+
+ Result result = region.get(new Get(row));
+ Cell c = result.getColumnLatestCell(fam1, qual1);
+ assertNotNull(c);
+ assertEquals(c.getTimestamp(), 10L);
+
+ edge.setValue(1); // clock goes back
+ p = new Put(row);
+ p.setDurability(Durability.SKIP_WAL);
+ p.addColumn(fam1, qual1, qual2);
+ RowMutations rm = new RowMutations(row);
+ rm.add(p);
+ region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1),
+ rm, false);
+ result = region.get(new Get(row));
+ c = result.getColumnLatestCell(fam1, qual1);
+ assertEquals(c.getTimestamp(), 10L);
+
+ assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
+ qual2, 0, qual2.length));
+ }
+
static HRegion initHRegion(TableName tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),