You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/07 17:54:55 UTC
hbase git commit: HBASE-16992 The usage of mutation from CP is weird.
(ChiaPing Tsai)
Repository: hbase
Updated Branches:
refs/heads/master e60a7f6e7 -> 2182ca34a
HBASE-16992 The usage of mutation from CP is weird. (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2182ca34
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2182ca34
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2182ca34
Branch: refs/heads/master
Commit: 2182ca34ac935edd126e7ed17eb3237772d9d18b
Parents: e60a7f6
Author: anoopsamjohn <an...@gmail.com>
Authored: Mon Nov 7 23:24:34 2016 +0530
Committer: anoopsamjohn <an...@gmail.com>
Committed: Mon Nov 7 23:24:34 2016 +0530
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 9 +--
.../MiniBatchOperationInProgress.java | 2 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 72 ++++++++++++++++++++
3 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7a9d4e2..3423473 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3202,10 +3202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
- if (cpMutation.getDurability() == Durability.SKIP_WAL) {
- recordMutationWithoutWal(cpFamilyMap);
- }
-
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later
@@ -3227,6 +3223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
+ // we use durability of the original mutation for the mutation passed by CP.
if (tmpDur == Durability.SKIP_WAL) {
recordMutationWithoutWal(m.getFamilyCellMap());
continue;
@@ -3324,6 +3321,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// We need to update the sequence id for following reasons.
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
// 2) If no WAL, FSWALEntry won't be used
+ // we use durability of the original mutation for the mutation passed by CP.
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
this.updateSequenceId(familyMaps[i].values(),
@@ -7978,8 +7976,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int listSize = cells.size();
for (int i=0; i < listSize; i++) {
Cell cell = cells.get(i);
- // TODO we need include tags length also here.
- mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
+ mutationSize += KeyValueUtil.length(cell);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index cdbecac..1ab2ef5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -110,7 +110,7 @@ public class MiniBatchOperationInProgress<T> {
* Add more Mutations corresponding to the Mutation at the given index to be committed atomically
* in the same batch. These mutations are applied to the WAL and applied to the memstore as well.
* The timestamp of the cells in the given Mutations MUST be obtained from the original mutation.
- *
+ * <b>Note:</b> The durability from CP will be replaced by the durability of corresponding mutation.
* @param index the index that corresponds to the original mutation index in the batch
* @param newOperations the Mutations to add
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/2182ca34/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 3e5b127..6c63cff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
@@ -2414,6 +2415,77 @@ public class TestHRegion {
}
@Test
+ public void testDataInMemoryWithoutWAL() throws IOException {
+ FileSystem fs = FileSystem.get(CONF);
+ Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
+ FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
+ HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
+ CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
+
+ Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
+ System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
+ final long originalSize = KeyValueUtil.length(originalCell);
+
+ Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
+ System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
+ final long addSize = KeyValueUtil.length(addCell);
+
+ LOG.info("originalSize:" + originalSize
+ + ", addSize:" + addSize);
+ // start test. We expect that the addPut's durability will be replaced
+ // by originalPut's durability.
+
+ // case 1:
+ testDataInMemoryWithoutWAL(region,
+ new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
+ new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
+ originalSize + addSize);
+
+ // case 2:
+ testDataInMemoryWithoutWAL(region,
+ new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
+ new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
+ originalSize + addSize);
+
+ // case 3:
+ testDataInMemoryWithoutWAL(region,
+ new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
+ new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
+ 0);
+
+ // case 4:
+ testDataInMemoryWithoutWAL(region,
+ new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
+ new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
+ 0);
+ }
+
+ private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
+ final Put addPut, long delta) throws IOException {
+ final long initSize = region.getDataInMemoryWithoutWAL();
+ // save normalCPHost and replaced by mockedCPHost
+ RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
+ RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
+ Answer<Boolean> answer = new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0,
+ MiniBatchOperationInProgress.class);
+ mb.addOperationsFromCP(0, new Mutation[]{addPut});
+ return false;
+ }
+ };
+ when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)))
+ .then(answer);
+ region.setCoprocessorHost(mockedCPHost);
+ region.put(originalPut);
+ region.setCoprocessorHost(normalCPHost);
+ final long finalSize = region.getDataInMemoryWithoutWAL();
+ assertEquals("finalSize:" + finalSize + ", initSize:"
+ + initSize + ", delta:" + delta,finalSize, initSize + delta);
+ }
+
+ @Test
public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
Delete delete = new Delete(row);
delete.addColumns(fam1, qual1);