You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bi...@apache.org on 2020/04/16 08:31:51 UTC
[hbase] branch branch-1 updated: HBASE-24191
HRegion#processRowsWithLocks count memstore size wrong when sync failed
(#1518)
This is an automated email from the ASF dual-hosted git repository.
binlijin pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 589d075 HBASE-24191 HRegion#processRowsWithLocks count memstore size wrong when sync failed (#1518)
589d075 is described below
commit 589d075de80cab2a0d2e5e3ae7bd078c8cae1d69
Author: binlijin <bi...@gmail.com>
AuthorDate: Thu Apr 16 16:31:38 2020 +0800
HBASE-24191 HRegion#processRowsWithLocks count memstore size wrong when sync failed (#1518)
Signed-off-by: Anoop Sam John <an...@apache.org>
---
.../apache/hadoop/hbase/regionserver/HRegion.java | 2 +-
.../TestRegionProcessRowsWithLocks.java | 321 +++++++++++++++++++++
2 files changed, 322 insertions(+), 1 deletion(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9ff2788..8ef3865 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7717,7 +7717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
closeRegionOperation();
- if (!mutations.isEmpty() &&
+ if (!mutations.isEmpty() && walSyncSuccessful &&
isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
requestFlush();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionProcessRowsWithLocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionProcessRowsWithLocks.java
new file mode 100644
index 0000000..227faf9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionProcessRowsWithLocks.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
+import org.apache.hadoop.hbase.coprocessor.BaseRowProcessorEndpoint;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test HRegion#processRowsWithLocks
+ */
+@Category(MediumTests.class)
+public class TestRegionProcessRowsWithLocks {
+
+ private static final Log LOG = LogFactory.getLog(TestRegionProcessRowsWithLocks.class);
+
+ private static final TableName TABLE = TableName.valueOf("testtable");
+ private final static byte[] ROW = Bytes.toBytes("testrow");
+ private final static byte[] FAM = Bytes.toBytes("friendlist");
+
+ // Column names
+ private final static byte[] A = Bytes.toBytes("a");
+ private final static byte[] B = Bytes.toBytes("b");
+ private final static byte[] C = Bytes.toBytes("c");
+ private final static byte[] D = Bytes.toBytes("d");
+ private final static byte[] E = Bytes.toBytes("e");
+ private final static byte[] F = Bytes.toBytes("f");
+ private final static byte[] G = Bytes.toBytes("g");
+ private final static byte[] COUNTER = Bytes.toBytes("counter");
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+ private static volatile int expectedCounter = 0;
+
+ private volatile static Table table = null;
+ private static final AtomicBoolean throwsException = new AtomicBoolean(false);
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = util.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ RowProcessorEndpoint.class.getName());
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
+ util.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ public void prepareTestData() throws Exception {
+ try {
+ util.getHBaseAdmin().disableTable(TABLE);
+ util.getHBaseAdmin().deleteTable(TABLE);
+ } catch (Exception e) {
+ // ignore table not found
+ }
+ table = util.createTable(TABLE, FAM);
+ {
+ Put put = new Put(ROW);
+ put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
+ put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
+ put.add(FAM, C, G); // G is a friend of C
+ table.put(put);
+ }
+ expectedCounter = 0;
+ }
+
+ @Test
+ public void testProcessNormal() throws Throwable {
+ prepareTestData();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(TABLE);
+ HRegion region = regions.get(0);
+ long startMemstoreSize = region.getMemstoreSize();
+ long startFlushableSize = region.getStore(FAM).getFlushableSize();
+ int finalCounter = incrementCounter(table);
+ assertEquals(expectedCounter, finalCounter);
+ Get get = new Get(ROW);
+ Result result = table.get(get);
+ LOG.debug("row keyvalues:" + stringifyKvs(result.listCells()));
+ int getR = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(FAM, COUNTER)));
+ assertEquals(expectedCounter, getR);
+
+ long endMemstoreSize = region.getMemstoreSize();
+ long endFlushableSize = region.getStore(FAM).getFlushableSize();
+ Assert.assertEquals("Should equal.", (endMemstoreSize - startMemstoreSize),
+ (endFlushableSize - startFlushableSize));
+ }
+
+ @Test
+ public void testProcessExceptionAndRollBack() throws Throwable {
+ prepareTestData();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(TABLE);
+ HRegion region = regions.get(0);
+ long startMemstoreSize = region.getMemstoreSize();
+ long startFlushableSize = region.getStore(FAM).getFlushableSize();
+ WAL wal = region.getWAL();
+ wal.registerWALActionsListener(new WALActionsListener.Base() {
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ if (throwsException.get()) {
+ throwsException.set(false);
+ throw new IOException("throw test IOException");
+ }
+ }
+ });
+ try {
+ incrementCounter(table);
+ Assert.fail("Should throw IOException.");
+ } catch (Throwable e) {
+ }
+
+ long endMemstoreSize = region.getMemstoreSize();
+ long endFlushableSize = region.getStore(FAM).getFlushableSize();
+ LOG.info(
+ "MemstoreSize deta=" + (endMemstoreSize - startMemstoreSize) + ",FlushableSize deta=" + (
+ endFlushableSize - startFlushableSize));
+ Assert.assertEquals("Should equal.", (endMemstoreSize - startMemstoreSize),
+ (endFlushableSize - startFlushableSize));
+ }
+
+ private int incrementCounter(Table table) throws Throwable {
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+ RowProcessorEndpoint.IncrementCounterProcessor processor =
+ new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
+ RowProcessorService.BlockingInterface service = RowProcessorService.newBlockingStub(channel);
+ ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ ProcessResponse protoResult = service.process(null, request);
+ IncCounterProcessorResponse response =
+ IncCounterProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
+ Integer result = response.getResponse();
+ return result;
+ }
+
+ /**
+ * This class defines two RowProcessors:
+ * IncrementCounterProcessor and FriendsOfFriendsProcessor.
+ * We define the RowProcessors as the inner class of the endpoint.
+ * So they can be loaded with the endpoint on the coprocessor.
+ */
+ public static class RowProcessorEndpoint<S extends Message, T extends Message>
+ extends BaseRowProcessorEndpoint<S, T> implements CoprocessorService {
+ public static class IncrementCounterProcessor
+ extends BaseRowProcessor<IncCounterProcessorRequest, IncCounterProcessorResponse> {
+ int counter = 0;
+ byte[] row = new byte[0];
+
+ /**
+ * Empty constructor for Writable
+ */
+ public IncrementCounterProcessor() {
+ }
+
+ public IncrementCounterProcessor(byte[] row) {
+ this.row = row;
+ }
+
+ @Override
+ public Collection<byte[]> getRowsToLock() {
+ return Collections.singleton(row);
+ }
+
+ @Override
+ public IncCounterProcessorResponse getResult() {
+ IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
+ i.setResponse(counter);
+ return i.build();
+ }
+
+ @Override
+ public boolean readOnly() {
+ return false;
+ }
+
+ @Override
+ public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit)
+ throws IOException {
+ // Scan current counter
+ List<Cell> kvs = new ArrayList<Cell>();
+ Scan scan = new Scan(row, row);
+ scan.addColumn(FAM, COUNTER);
+ doScan(region, scan, kvs);
+ LOG.info("kvs.size()="+kvs.size());
+ counter = kvs.size() == 0 ? 0 : Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
+ LOG.info("counter=" + counter);
+
+ // Assert counter value
+ assertEquals(expectedCounter, counter);
+
+ // Increment counter and send it to both memstore and wal edit
+ counter += 1;
+ expectedCounter += 1;
+
+ Put p = new Put(row);
+ KeyValue kv = new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
+ p.add(kv);
+ mutations.add(p);
+ walEdit.add(kv);
+
+ // We can also inject some meta data to the walEdit
+ KeyValue metaKv =
+ new KeyValue(row, WALEdit.METAFAMILY, Bytes.toBytes("I just increment counter"),
+ Bytes.toBytes(counter));
+ walEdit.add(metaKv);
+ throwsException.set(true);
+ }
+
+ @Override
+ public IncCounterProcessorRequest getRequestData() throws IOException {
+ IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
+ builder.setCounter(counter);
+ builder.setRow(ByteStringer.wrap(row));
+ return builder.build();
+ }
+
+ @Override
+ public void initialize(IncCounterProcessorRequest msg) {
+ this.row = msg.getRow().toByteArray();
+ this.counter = msg.getCounter();
+ }
+ }
+
+ public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
+ InternalScanner scanner = null;
+ try {
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ scanner = region.getScanner(scan);
+ result.clear();
+ scanner.next(result);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ }
+ }
+
+ static String stringifyKvs(Collection<Cell> kvs) {
+ StringBuilder out = new StringBuilder();
+ out.append("[");
+ if (kvs != null) {
+ for (Cell kv : kvs) {
+ byte[] col = CellUtil.cloneQualifier(kv);
+ byte[] val = CellUtil.cloneValue(kv);
+ if (Bytes.equals(col, COUNTER)) {
+ out.append(Bytes.toStringBinary(col) + ":" + Bytes.toInt(val) + " ");
+ } else {
+ out.append(Bytes.toStringBinary(col) + ":" + Bytes.toStringBinary(val) + " ");
+ }
+ }
+ }
+ out.append("]");
+ return out.toString();
+ }
+
+}