You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2021/03/27 22:39:34 UTC
[hbase] branch master updated: HBASE-25702 Remove RowProcessor
(#3097)
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 93b1163 HBASE-25702 Remove RowProcessor (#3097)
93b1163 is described below
commit 93b1163a8bc759e2121cd25d8e209cc69c9c9e74
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Sun Mar 28 07:38:42 2021 +0900
HBASE-25702 Remove RowProcessor (#3097)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../coprocessor/TestRowProcessorEndpoint.java | 679 ---------------------
.../protobuf/server/coprocessor/RowProcessor.proto | 46 --
.../client/coprocessor/RowProcessorClient.java | 53 --
.../coprocessor/BaseRowProcessorEndpoint.java | 149 -----
.../hbase/regionserver/BaseRowProcessor.java | 71 ---
.../apache/hadoop/hbase/regionserver/HRegion.java | 257 --------
.../apache/hadoop/hbase/regionserver/Region.java | 47 +-
.../hadoop/hbase/regionserver/RowProcessor.java | 159 -----
8 files changed, 1 insertion(+), 1460 deletions(-)
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
deleted file mode 100644
index ebdbc3d..0000000
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.coprocessor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.IsolationLevel;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
-import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.RowProcessorService;
-
-/**
- * Verifies ProcessEndpoint works.
- * The tested RowProcessor performs two scans and a read-modify-write.
- */
-@Category({CoprocessorTests.class, MediumTests.class})
-public class TestRowProcessorEndpoint {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestRowProcessorEndpoint.class);
-
- private static final TableName TABLE = TableName.valueOf("testtable");
- private final static byte[] ROW = Bytes.toBytes("testrow");
- private final static byte[] ROW2 = Bytes.toBytes("testrow2");
- private final static byte[] FAM = Bytes.toBytes("friendlist");
-
- // Column names
- private final static byte[] A = Bytes.toBytes("a");
- private final static byte[] B = Bytes.toBytes("b");
- private final static byte[] C = Bytes.toBytes("c");
- private final static byte[] D = Bytes.toBytes("d");
- private final static byte[] E = Bytes.toBytes("e");
- private final static byte[] F = Bytes.toBytes("f");
- private final static byte[] G = Bytes.toBytes("g");
- private final static byte[] COUNTER = Bytes.toBytes("counter");
- private final static AtomicLong myTimer = new AtomicLong(0);
- private final AtomicInteger failures = new AtomicInteger(0);
-
- private static HBaseTestingUtility util = new HBaseTestingUtility();
- private static volatile int expectedCounter = 0;
- private static int rowSize, row2Size;
-
- private volatile static Table table = null;
- private volatile static boolean swapped = false;
- private volatile CountDownLatch startSignal;
- private volatile CountDownLatch doneSignal;
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- Configuration conf = util.getConfiguration();
- conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- RowProcessorEndpoint.class.getName());
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
- conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
- util.startMiniCluster();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- util.shutdownMiniCluster();
- }
-
- public void prepareTestData() throws Exception {
- try {
- util.getAdmin().disableTable(TABLE);
- util.getAdmin().deleteTable(TABLE);
- } catch (Exception e) {
- // ignore table not found
- }
- table = util.createTable(TABLE, FAM);
- {
- Put put = new Put(ROW);
- put.addColumn(FAM, A, Bytes.add(B, C)); // B, C are friends of A
- put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
- put.addColumn(FAM, C, G); // G is a friend of C
- table.put(put);
- rowSize = put.size();
- }
- Put put = new Put(ROW2);
- put.addColumn(FAM, D, E);
- put.addColumn(FAM, F, G);
- table.put(put);
- row2Size = put.size();
- }
-
- @Test
- public void testDoubleScan() throws Throwable {
- prepareTestData();
-
- CoprocessorRpcChannel channel = table.coprocessorService(ROW);
- RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
- new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
- RowProcessorService.BlockingInterface service =
- RowProcessorService.newBlockingStub(channel);
- ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
- ProcessResponse protoResult = service.process(null, request);
- FriendsOfFriendsProcessorResponse response =
- FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
- Set<String> result = new HashSet<>();
- result.addAll(response.getResultList());
- Set<String> expected = new HashSet<>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
- Get get = new Get(ROW);
- LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
- assertEquals(expected, result);
- }
-
- @Test
- public void testReadModifyWrite() throws Throwable {
- prepareTestData();
- failures.set(0);
- int numThreads = 100;
- concurrentExec(new IncrementRunner(), numThreads);
- Get get = new Get(ROW);
- LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
- int finalCounter = incrementCounter(table);
- int failureNumber = failures.get();
- if (failureNumber > 0) {
- LOG.debug("We failed " + failureNumber + " times during test");
- }
- assertEquals(numThreads + 1 - failureNumber, finalCounter);
- }
-
- class IncrementRunner implements Runnable {
- @Override
- public void run() {
- try {
- incrementCounter(table);
- } catch (Throwable e) {
- failures.incrementAndGet();
- e.printStackTrace();
- }
- }
- }
-
- private int incrementCounter(Table table) throws Throwable {
- CoprocessorRpcChannel channel = table.coprocessorService(ROW);
- RowProcessorEndpoint.IncrementCounterProcessor processor =
- new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
- RowProcessorService.BlockingInterface service =
- RowProcessorService.newBlockingStub(channel);
- ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
- ProcessResponse protoResult = service.process(null, request);
- IncCounterProcessorResponse response = IncCounterProcessorResponse
- .parseFrom(protoResult.getRowProcessorResult());
- Integer result = response.getResponse();
- return result;
- }
-
- private void concurrentExec(final Runnable task, final int numThreads) throws Throwable {
- startSignal = new CountDownLatch(numThreads);
- doneSignal = new CountDownLatch(numThreads);
- for (int i = 0; i < numThreads; ++i) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- startSignal.countDown();
- startSignal.await();
- task.run();
- } catch (Throwable e) {
- failures.incrementAndGet();
- e.printStackTrace();
- }
- doneSignal.countDown();
- }
- }).start();
- }
- doneSignal.await();
- }
-
- @Test
- public void testMultipleRows() throws Throwable {
- prepareTestData();
- failures.set(0);
- int numThreads = 100;
- concurrentExec(new SwapRowsRunner(), numThreads);
- LOG.debug("row keyvalues:" +
- stringifyKvs(table.get(new Get(ROW)).listCells()));
- LOG.debug("row2 keyvalues:" +
- stringifyKvs(table.get(new Get(ROW2)).listCells()));
- int failureNumber = failures.get();
- if (failureNumber > 0) {
- LOG.debug("We failed " + failureNumber + " times during test");
- }
- if (!swapped) {
- assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
- assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
- } else {
- assertEquals(rowSize, table.get(new Get(ROW2)).listCells().size());
- assertEquals(row2Size, table.get(new Get(ROW)).listCells().size());
- }
- }
-
- class SwapRowsRunner implements Runnable {
- @Override
- public void run() {
- try {
- swapRows(table);
- } catch (Throwable e) {
- failures.incrementAndGet();
- e.printStackTrace();
- }
- }
- }
-
- private void swapRows(Table table) throws Throwable {
- CoprocessorRpcChannel channel = table.coprocessorService(ROW);
- RowProcessorEndpoint.RowSwapProcessor processor =
- new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
- RowProcessorService.BlockingInterface service =
- RowProcessorService.newBlockingStub(channel);
- ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
- service.process(null, request);
- }
-
- @Test
- public void testTimeout() throws Throwable {
- prepareTestData();
- CoprocessorRpcChannel channel = table.coprocessorService(ROW);
- RowProcessorEndpoint.TimeoutProcessor processor =
- new RowProcessorEndpoint.TimeoutProcessor(ROW);
- RowProcessorService.BlockingInterface service =
- RowProcessorService.newBlockingStub(channel);
- ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
- boolean exceptionCaught = false;
- try {
- service.process(null, request);
- } catch (Exception e) {
- exceptionCaught = true;
- }
- assertTrue(exceptionCaught);
- }
-
- /**
- * This class defines two RowProcessors:
- * IncrementCounterProcessor and FriendsOfFriendsProcessor.
- *
- * We define the RowProcessors as the inner class of the endpoint.
- * So they can be loaded with the endpoint on the coprocessor.
- */
- public static class RowProcessorEndpoint<S extends Message,T extends Message>
- extends BaseRowProcessorEndpoint<S,T> {
- public static class IncrementCounterProcessor extends
- BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
- IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
- int counter = 0;
- byte[] row = new byte[0];
-
- /**
- * Empty constructor for Writable
- */
- IncrementCounterProcessor() {
- }
-
- IncrementCounterProcessor(byte[] row) {
- this.row = row;
- }
-
- @Override
- public Collection<byte[]> getRowsToLock() {
- return Collections.singleton(row);
- }
-
- @Override
- public IncCounterProcessorResponse getResult() {
- IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
- i.setResponse(counter);
- return i.build();
- }
-
- @Override
- public boolean readOnly() {
- return false;
- }
-
- @Override
- public void process(long now, HRegion region,
- List<Mutation> mutations, WALEdit walEdit) throws IOException {
- // Scan current counter
- List<Cell> kvs = new ArrayList<>();
- Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
- scan.addColumn(FAM, COUNTER);
- doScan(region, scan, kvs);
- counter = kvs.isEmpty() ? 0 :
- Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
-
- // Assert counter value
- assertEquals(expectedCounter, counter);
-
- // Increment counter and send it to both memstore and wal edit
- counter += 1;
- expectedCounter += 1;
-
-
- Put p = new Put(row);
- KeyValue kv =
- new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
- p.add(kv);
- mutations.add(p);
- walEdit.add(kv);
-
- // We can also inject some meta data to the walEdit
- KeyValue metaKv = new KeyValue(
- row, WALEdit.METAFAMILY,
- Bytes.toBytes("I just increment counter"),
- Bytes.toBytes(counter));
- walEdit.add(metaKv);
- }
-
- @Override
- public IncCounterProcessorRequest getRequestData() throws IOException {
- IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
- builder.setCounter(counter);
- builder.setRow(UnsafeByteOperations.unsafeWrap(row));
- return builder.build();
- }
-
- @Override
- public void initialize(IncCounterProcessorRequest msg) {
- this.row = msg.getRow().toByteArray();
- this.counter = msg.getCounter();
- }
- }
-
- public static class FriendsOfFriendsProcessor extends
- BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
- byte[] row = null;
- byte[] person = null;
- final Set<String> result = new HashSet<>();
-
- /**
- * Empty constructor for Writable
- */
- FriendsOfFriendsProcessor() {
- }
-
- FriendsOfFriendsProcessor(byte[] row, byte[] person) {
- this.row = row;
- this.person = person;
- }
-
- @Override
- public Collection<byte[]> getRowsToLock() {
- return Collections.singleton(row);
- }
-
- @Override
- public FriendsOfFriendsProcessorResponse getResult() {
- FriendsOfFriendsProcessorResponse.Builder builder =
- FriendsOfFriendsProcessorResponse.newBuilder();
- builder.addAllResult(result);
- return builder.build();
- }
-
- @Override
- public boolean readOnly() {
- return true;
- }
-
- @Override
- public void process(long now, HRegion region,
- List<Mutation> mutations, WALEdit walEdit) throws IOException {
- List<Cell> kvs = new ArrayList<>();
- { // First scan to get friends of the person
- Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
- scan.addColumn(FAM, person);
- doScan(region, scan, kvs);
- }
-
- // Second scan to get friends of friends
- Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
- for (Cell kv : kvs) {
- byte[] friends = CellUtil.cloneValue(kv);
- for (byte f : friends) {
- scan.addColumn(FAM, new byte[]{f});
- }
- }
- doScan(region, scan, kvs);
-
- // Collect result
- result.clear();
- for (Cell kv : kvs) {
- for (byte b : CellUtil.cloneValue(kv)) {
- result.add((char)b + "");
- }
- }
- }
-
- @Override
- public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
- FriendsOfFriendsProcessorRequest.Builder builder =
- FriendsOfFriendsProcessorRequest.newBuilder();
- builder.setPerson(UnsafeByteOperations.unsafeWrap(person));
- builder.setRow(UnsafeByteOperations.unsafeWrap(row));
- builder.addAllResult(result);
- FriendsOfFriendsProcessorRequest f = builder.build();
- return f;
- }
-
- @Override
- public void initialize(FriendsOfFriendsProcessorRequest request)
- throws IOException {
- this.person = request.getPerson().toByteArray();
- this.row = request.getRow().toByteArray();
- result.clear();
- result.addAll(request.getResultList());
- }
- }
-
- public static class RowSwapProcessor extends
- BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
- byte[] row1 = new byte[0];
- byte[] row2 = new byte[0];
-
- /**
- * Empty constructor for Writable
- */
- RowSwapProcessor() {
- }
-
- RowSwapProcessor(byte[] row1, byte[] row2) {
- this.row1 = row1;
- this.row2 = row2;
- }
-
- @Override
- public Collection<byte[]> getRowsToLock() {
- List<byte[]> rows = new ArrayList<>(2);
- rows.add(row1);
- rows.add(row2);
- return rows;
- }
-
- @Override
- public boolean readOnly() {
- return false;
- }
-
- @Override
- public RowSwapProcessorResponse getResult() {
- return RowSwapProcessorResponse.getDefaultInstance();
- }
-
- @Override
- public void process(long now, HRegion region,
- List<Mutation> mutations, WALEdit walEdit) throws IOException {
-
- // Override the time to avoid race-condition in the unit test caused by
- // inacurate timer on some machines
- now = myTimer.getAndIncrement();
-
- // Scan both rows
- List<Cell> kvs1 = new ArrayList<>();
- List<Cell> kvs2 = new ArrayList<>();
- doScan(region, new Scan().withStartRow(row1).withStopRow(row1), kvs1);
- doScan(region, new Scan().withStartRow(row2).withStopRow(row2), kvs2);
-
- // Assert swapped
- if (swapped) {
- assertEquals(rowSize, kvs2.size());
- assertEquals(row2Size, kvs1.size());
- } else {
- assertEquals(rowSize, kvs1.size());
- assertEquals(row2Size, kvs2.size());
- }
- swapped = !swapped;
-
- // Add and delete keyvalues
- List<List<Cell>> kvs = new ArrayList<>(2);
- kvs.add(kvs1);
- kvs.add(kvs2);
- byte[][] rows = new byte[][]{row1, row2};
- for (int i = 0; i < kvs.size(); ++i) {
- for (Cell kv : kvs.get(i)) {
- // Delete from the current row and add to the other row
- Delete d = new Delete(rows[i]);
- KeyValue kvDelete =
- new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
- kv.getTimestamp(), KeyValue.Type.Delete);
- d.add(kvDelete);
- Put p = new Put(rows[1 - i]);
- KeyValue kvAdd =
- new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
- now, CellUtil.cloneValue(kv));
- p.add(kvAdd);
- mutations.add(d);
- walEdit.add(kvDelete);
- mutations.add(p);
- walEdit.add(kvAdd);
- }
- }
- }
-
- @Override
- public String getName() {
- return "swap";
- }
-
- @Override
- public RowSwapProcessorRequest getRequestData() throws IOException {
- RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
- builder.setRow1(UnsafeByteOperations.unsafeWrap(row1));
- builder.setRow2(UnsafeByteOperations.unsafeWrap(row2));
- return builder.build();
- }
-
- @Override
- public void initialize(RowSwapProcessorRequest msg) {
- this.row1 = msg.getRow1().toByteArray();
- this.row2 = msg.getRow2().toByteArray();
- }
- }
-
- public static class TimeoutProcessor extends
- BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
- byte[] row = new byte[0];
-
- /**
- * Empty constructor for Writable
- */
- public TimeoutProcessor() {
- }
-
- public TimeoutProcessor(byte[] row) {
- this.row = row;
- }
-
- public Collection<byte[]> getRowsToLock() {
- return Collections.singleton(row);
- }
-
- @Override
- public TimeoutProcessorResponse getResult() {
- return TimeoutProcessorResponse.getDefaultInstance();
- }
-
- @Override
- public void process(long now, HRegion region,
- List<Mutation> mutations, WALEdit walEdit) throws IOException {
- try {
- // Sleep for a long time so it timeout
- Thread.sleep(100 * 1000L);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean readOnly() {
- return true;
- }
-
- @Override
- public String getName() {
- return "timeout";
- }
-
- @Override
- public TimeoutProcessorRequest getRequestData() throws IOException {
- TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
- builder.setRow(UnsafeByteOperations.unsafeWrap(row));
- return builder.build();
- }
-
- @Override
- public void initialize(TimeoutProcessorRequest msg) throws IOException {
- this.row = msg.getRow().toByteArray();
- }
- }
-
- public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
- InternalScanner scanner = null;
- try {
- scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
- scanner = region.getScanner(scan);
- result.clear();
- scanner.next(result);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- }
- }
-
- static String stringifyKvs(Collection<Cell> kvs) {
- StringBuilder out = new StringBuilder();
- out.append("[");
- if (kvs != null) {
- for (Cell kv : kvs) {
- byte[] col = CellUtil.cloneQualifier(kv);
- byte[] val = CellUtil.cloneValue(kv);
- if (Bytes.equals(col, COUNTER)) {
- out.append(Bytes.toStringBinary(col) + ":" +
- Bytes.toInt(val) + " ");
- } else {
- out.append(Bytes.toStringBinary(col) + ":" +
- Bytes.toStringBinary(val) + " ");
- }
- }
- }
- out.append("]");
- return out.toString();
- }
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/coprocessor/RowProcessor.proto b/hbase-protocol-shaded/src/main/protobuf/server/coprocessor/RowProcessor.proto
deleted file mode 100644
index 29d056c..0000000
--- a/hbase-protocol-shaded/src/main/protobuf/server/coprocessor/RowProcessor.proto
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines a protocol to perform multi row transactions.
- * See BaseRowProcessorEndpoint for the implementation.
- * See HRegion#processRowsWithLocks() for details.
- */
-syntax = "proto2";
-package hbase.pb;
-
-option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
-option java_outer_classname = "RowProcessorProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-message ProcessRequest {
- required string row_processor_class_name = 1;
- optional string row_processor_initializer_message_name = 2;
- optional bytes row_processor_initializer_message = 3;
- optional uint64 nonce_group = 4;
- optional uint64 nonce = 5;
-}
-
-message ProcessResponse {
- required bytes row_processor_result = 1;
-}
-
-service RowProcessorService {
- rpc Process(ProcessRequest) returns (ProcessResponse);
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java
deleted file mode 100644
index b35a092..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client.coprocessor;
-
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RowProcessor;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
-
-/**
- * Convenience class that is used to make RowProcessorEndpoint invocations.
- * For example usage, refer TestRowProcessorEndpoint
- *
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
-public class RowProcessorClient {
- public static <S extends Message, T extends Message>
- ProcessRequest getRowProcessorPB(RowProcessor<S,T> r)
- throws IOException {
- final ProcessRequest.Builder requestBuilder =
- ProcessRequest.newBuilder();
- requestBuilder.setRowProcessorClassName(r.getClass().getName());
- S s = r.getRequestData();
- if (s != null) {
- requestBuilder.setRowProcessorInitializerMessageName(s.getClass().getName());
- requestBuilder.setRowProcessorInitializerMessage(s.toByteString());
- }
- return requestBuilder.build();
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
deleted file mode 100644
index 5eb6e75..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.coprocessor;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collections;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RowProcessor;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.Service;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.RowProcessorService;
-
-
-/**
- * This class demonstrates how to implement atomic read-modify-writes
- * using {@link Region#processRowsWithLocks} and Coprocessor endpoints.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
-public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
-extends RowProcessorService implements RegionCoprocessor {
- private RegionCoprocessorEnvironment env;
- /**
- * Pass a processor to region to process multiple rows atomically.
- *
- * The RowProcessor implementations should be the inner classes of your
- * RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
- * the Coprocessor endpoint together.
- *
- * See {@code TestRowProcessorEndpoint} for example.
- *
- * The request contains information for constructing processor
- * (see {@link #constructRowProcessorFromRequest}. The processor object defines
- * the read-modify-write procedure.
- */
- @Override
- public void process(RpcController controller, ProcessRequest request,
- RpcCallback<ProcessResponse> done) {
- ProcessResponse resultProto = null;
- try {
- RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
- Region region = env.getRegion();
- long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
- long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
- region.processRowsWithLocks(processor, nonceGroup, nonce);
- T result = processor.getResult();
- ProcessResponse.Builder b = ProcessResponse.newBuilder();
- b.setRowProcessorResult(result.toByteString());
- resultProto = b.build();
- } catch (Exception e) {
- CoprocessorRpcUtils.setControllerException(controller, new IOException(e));
- }
- done.run(resultProto);
- }
-
- @Override
- public Iterable<Service> getServices() {
- return Collections.singleton(this);
- }
-
- /**
- * Stores a reference to the coprocessor environment provided by the
- * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
- * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
- * on a table region, so always expects this to be an instance of
- * {@link RegionCoprocessorEnvironment}.
- * @param env the environment provided by the coprocessor host
- * @throws IOException if the provided environment is not an instance of
- * {@code RegionCoprocessorEnvironment}
- */
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- this.env = (RegionCoprocessorEnvironment)env;
- } else {
- throw new CoprocessorException("Must be loaded on a table region!");
- }
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- // nothing to do
- }
-
- @SuppressWarnings("unchecked")
- RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request)
- throws IOException {
- String className = request.getRowProcessorClassName();
- Class<?> cls;
- try {
- cls = Class.forName(className);
- RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.getDeclaredConstructor().newInstance();
- if (request.hasRowProcessorInitializerMessageName()) {
- Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
- .asSubclass(Message.class);
- Method m;
- try {
- m = imn.getMethod("parseFrom", ByteString.class);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- }
- S s;
- try {
- s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- ci.initialize(s);
- }
- return ci;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
deleted file mode 100644
index 3206aaa..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.UUID;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
-/**
- * Base class for RowProcessor with some default implementations.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
-public abstract class BaseRowProcessor<S extends Message,T extends Message>
-implements RowProcessor<S,T> {
-
- @Override
- public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
- }
-
- @Override
- public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
- }
-
- @Override
- public void postBatchMutate(HRegion region) throws IOException {
- }
-
- @Override
- public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
- }
-
- @Override
- public List<UUID> getClusterIds() {
- return Collections.emptyList();
- }
-
- @Override
- public String getName() {
- return this.getClass().getSimpleName().toLowerCase(Locale.ROOT);
- }
-
- @Override
- public Durability useDurability() {
- return Durability.USE_DEFAULT;
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 85b2eae..9da7f7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -55,14 +55,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
@@ -387,10 +383,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Number of mutations for minibatch processing.
private final int miniBatchSize;
- // negative number indicates infinite timeout
- static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
- final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
-
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
@@ -676,7 +668,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long memstoreFlushSize;
final long timestampSlop;
- final long rowProcessorTimeout;
// Last flush time for each Store. Useful when we are flushing for each column
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
@@ -846,13 +837,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
HConstants.LATEST_TIMESTAMP);
- /**
- * Timeout for the process time in processRowsWithLocks().
- * Use -1 to switch off time bound.
- */
- this.rowProcessorTimeout = conf.getLong(
- "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
-
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC);
@@ -5124,7 +5108,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
* set; when set we will run operations that make sense in the increment/append scenario
* but that do not make sense otherwise.
- * @see #applyToMemStore(HStore, Cell, MemStoreSizing)
*/
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSizing memstoreAccounting) throws IOException {
@@ -5137,19 +5120,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /**
- * @see #applyToMemStore(HStore, List, boolean, MemStoreSizing)
- */
- private void applyToMemStore(HStore store, Cell cell, MemStoreSizing memstoreAccounting)
- throws IOException {
- // Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
- if (store == null) {
- checkFamily(CellUtil.cloneFamily(cell));
- // Unreachable because checkFamily will throw exception
- }
- store.add(cell, memstoreAccounting);
- }
-
private void checkFamilies(Collection<byte[]> families, Durability durability)
throws NoSuchColumnFamilyException, InvalidMutationDurabilityException {
for (byte[] family : families) {
@@ -7674,227 +7644,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
- processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE);
- }
-
- @Override
- public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
- throws IOException {
- processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
- }
-
- @Override
- public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
- long nonceGroup, long nonce) throws IOException {
- for (byte[] row : processor.getRowsToLock()) {
- checkRow(row, "processRowsWithLocks");
- }
- if (!processor.readOnly()) {
- checkReadOnly();
- }
- checkResources();
- startRegionOperation();
- WALEdit walEdit = new WALEdit();
-
- // STEP 1. Run pre-process hook
- preProcess(processor, walEdit);
- // Short circuit the read only case
- if (processor.readOnly()) {
- try {
- long now = EnvironmentEdgeManager.currentTime();
- doProcessRowWithTimeout(processor, now, this, null, null, timeout);
- processor.postProcess(this, walEdit, true);
- } finally {
- closeRegionOperation();
- }
- return;
- }
-
- boolean locked = false;
- List<RowLock> acquiredRowLocks = null;
- List<Mutation> mutations = new ArrayList<>();
- Collection<byte[]> rowsToLock = processor.getRowsToLock();
- // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
- // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
- WriteEntry writeEntry = null;
- MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
-
- // Check for thread interrupt status in case we have been signaled from
- // #interruptRegionOperation.
- checkInterrupt();
-
- try {
- boolean success = false;
- try {
- // STEP 2. Acquire the row lock(s)
- acquiredRowLocks = new ArrayList<>(rowsToLock.size());
- RowLock prevRowLock = null;
- for (byte[] row : rowsToLock) {
- // Attempt to lock all involved rows, throw if any lock times out
- // use a writer lock for mixed reads and writes
- RowLock rowLock = getRowLockInternal(row, false, prevRowLock);
- if (rowLock != prevRowLock) {
- acquiredRowLocks.add(rowLock);
- prevRowLock = rowLock;
- }
- }
-
- // Check for thread interrupt status in case we have been signaled from
- // #interruptRegionOperation. Do it before we take the lock and disable interrupts for
- // the WAL append.
- checkInterrupt();
-
- // STEP 3. Region lock
- lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
- locked = true;
-
- // From this point until memstore update this operation should not be interrupted.
- disableInterrupts();
-
- long now = EnvironmentEdgeManager.currentTime();
- // STEP 4. Let the processor scan the rows, generate mutations and add waledits
- doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
- if (!mutations.isEmpty()) {
- writeRequestsCount.add(mutations.size());
- // STEP 5. Call the preBatchMutate hook
- processor.preBatchMutate(this, walEdit);
-
- // STEP 6. Append and sync if walEdit has data to write out.
- if (!walEdit.isEmpty()) {
- writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
- processor.getClusterIds(), now, nonceGroup, nonce);
- } else {
- // We are here if WAL is being skipped.
- writeEntry = this.mvcc.begin();
- }
-
- // STEP 7. Apply to memstore
- long sequenceId = writeEntry.getWriteNumber();
- for (Mutation m : mutations) {
- // Handle any tag based cell features.
- // TODO: Do we need to call rewriteCellTags down in applyToMemStore()? Why not before
- // so tags go into WAL?
- rewriteCellTags(m.getFamilyCellMap(), m);
- for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
- Cell cell = cellScanner.current();
- if (walEdit.isEmpty()) {
- // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
- // If no WAL, need to stamp it here.
- PrivateCellUtil.setSequenceId(cell, sequenceId);
- }
- applyToMemStore(getStore(cell), cell, memstoreAccounting);
- }
- }
-
- // STEP 8. call postBatchMutate hook
- processor.postBatchMutate(this);
-
- // STEP 9. Complete mvcc.
- mvcc.completeAndWait(writeEntry);
- writeEntry = null;
-
- // STEP 10. Release region lock
- if (locked) {
- this.updatesLock.readLock().unlock();
- locked = false;
- }
-
- // STEP 11. Release row lock(s)
- releaseRowLocks(acquiredRowLocks);
-
- if (rsServices != null && rsServices.getMetrics() != null) {
- rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
- getTableName(), mutations.size());
- }
- }
- success = true;
- } finally {
- // Call complete rather than completeAndWait because we probably had error if walKey != null
- if (writeEntry != null) mvcc.complete(writeEntry);
- if (locked) {
- this.updatesLock.readLock().unlock();
- }
- // release locks if some were acquired but another timed out
- releaseRowLocks(acquiredRowLocks);
-
- enableInterrupts();
- }
-
- // 12. Run post-process hook
- processor.postProcess(this, walEdit, success);
- } finally {
- closeRegionOperation();
- if (!mutations.isEmpty()) {
- this.incMemStoreSize(memstoreAccounting.getMemStoreSize());
- requestFlushIfNeeded();
- }
- }
- }
-
- private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit)
- throws IOException {
- try {
- processor.preProcess(this, walEdit);
- } catch (IOException e) {
- closeRegionOperation();
- throw e;
- }
- }
-
- private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
- final long now,
- final HRegion region,
- final List<Mutation> mutations,
- final WALEdit walEdit,
- final long timeout) throws IOException {
- // Short circuit the no time bound case.
- if (timeout < 0) {
- try {
- processor.process(now, region, mutations, walEdit);
- } catch (IOException e) {
- String row = processor.getRowsToLock().isEmpty() ? "" :
- " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
- LOG.warn("RowProcessor: {}, in region {}, throws Exception {}",
- processor.getClass().getName(), getRegionInfo().getRegionNameAsString(), row, e);
- throw e;
- }
- return;
- }
-
- // Case with time bound
- FutureTask<Void> task = new FutureTask<>(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- try {
- processor.process(now, region, mutations, walEdit);
- return null;
- } catch (IOException e) {
- String row = processor.getRowsToLock().isEmpty() ? "" :
- " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
- LOG.warn("RowProcessor: {}, in region {}, throws Exception {}",
- processor.getClass().getName(), getRegionInfo().getRegionNameAsString(), row, e);
- throw e;
- }
- }
- });
- rowProcessorExecutor.execute(task);
- try {
- task.get(timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ie) {
- throw throwOnInterrupt(ie);
- } catch (TimeoutException te) {
- String row = processor.getRowsToLock().isEmpty() ? "" :
- " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
- LOG.error("RowProcessor timeout: {} ms, in region {}, {}", timeout,
- getRegionInfo().getRegionNameAsString(), row);
- throw new IOException(te);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
public Result append(Append append) throws IOException {
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@@ -7928,12 +7677,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
- long now, long nonceGroup, long nonce) throws IOException {
- return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
- SequenceId.NO_SEQUENCE_ID);
- }
-
/**
* @return writeEntry associated with this append
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 1457cda..5cca201 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -257,7 +257,6 @@ public interface Region extends ConfigurationObserver {
}
/**
- *
* Get a row lock for the specified row. All locks are reentrant.
*
* Before calling this function make sure that a region operation has already been
@@ -275,8 +274,6 @@ public interface Region extends ConfigurationObserver {
* @see #startRegionOperation()
* @see #startRegionOperation(Operation)
*/
- // TODO this needs to be exposed as we have RowProcessor now. If RowProcessor is removed, we can
- // remove this too..
RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
///////////////////////////////////////////////////////////////////////////
@@ -555,53 +552,11 @@ public interface Region extends ConfigurationObserver {
* @throws IOException
*/
// TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
- // Changing processRowsWithLocks and RowProcessor
+ // Changing processRowsWithLocks
void mutateRowsWithLocks(Collection<Mutation> mutations, Collection<byte[]> rowsToLock,
long nonceGroup, long nonce) throws IOException;
/**
- * Performs atomic multiple reads and writes on a given row.
- *
- * @param processor The object defines the reads and writes to a row.
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
- * Coprocessors instead.
- */
- @Deprecated
- void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException;
-
- /**
- * Performs atomic multiple reads and writes on a given row.
- *
- * @param processor The object defines the reads and writes to a row.
- * @param nonceGroup Optional nonce group of the operation (client Id)
- * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
- * Coprocessors instead.
- */
- // TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
- // Changing processRowsWithLocks and RowProcessor
- @Deprecated
- void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
- throws IOException;
-
- /**
- * Performs atomic multiple reads and writes on a given row.
- *
- * @param processor The object defines the reads and writes to a row.
- * @param timeout The timeout of the processor.process() execution
- * Use a negative number to switch off the time bound
- * @param nonceGroup Optional nonce group of the operation (client Id)
- * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
- * Coprocessors instead.
- */
- // TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
- // Changing processRowsWithLocks and RowProcessor
- @Deprecated
- void processRowsWithLocks(RowProcessor<?,?> processor, long timeout, long nonceGroup, long nonce)
- throws IOException;
-
- /**
* Puts some data in the table.
* @param put
* @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
deleted file mode 100644
index 4a41b7f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
-
-/**
- * Defines the procedures to atomically perform multiple scans and mutations
- * on a HRegion.
- *
- * This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}.
- * This class performs scans and generates mutations and WAL edits.
- * The locks and MVCC will be handled by HRegion.
- *
- * The RowProcessor user code could have data that needs to be
- * sent across for proper initialization at the server side. The generic type
- * parameter S is the type of the request data sent to the server.
- * The generic type parameter T is the return type of RowProcessor.getResult().
- *
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
- * Coprocessors instead.
- */
-@Deprecated
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Evolving
-public interface RowProcessor<S extends Message, T extends Message> {
-
- /**
- * Rows to lock while operation.
- * They have to be sorted with <code>RowProcessor</code>
- * to avoid deadlock.
- */
- Collection<byte[]> getRowsToLock();
-
- /**
- * Obtain the processing result. All row processor implementations must
- * implement this, even if the method is simply returning an empty
- * Message.
- */
- T getResult();
-
- /**
- * Is this operation read only? If this is true, process() should not add
- * any mutations or it throws IOException.
- * @return ture if read only operation
- */
- boolean readOnly();
-
- /**
- * HRegion handles the locks and MVCC and invokes this method properly.
- *
- * You should override this to create your own RowProcessor.
- *
- * If you are doing read-modify-write here, you should consider using
- * <code>IsolationLevel.READ_UNCOMMITTED</code> for scan because
- * we advance MVCC after releasing the locks for optimization purpose.
- *
- * @param now the current system millisecond
- * @param region the HRegion
- * @param mutations the output mutations to apply to memstore
- * @param walEdit the output WAL edits to apply to write ahead log
- */
- void process(long now,
- HRegion region,
- List<Mutation> mutations,
- WALEdit walEdit) throws IOException;
-
- /**
- * The hook to be executed before process().
- *
- * @param region the HRegion
- * @param walEdit the output WAL edits to apply to write ahead log
- */
- void preProcess(HRegion region, WALEdit walEdit) throws IOException;
-
- /**
- * The hook to be executed after the process() but before applying the Mutations to region. Also
- * by the time this hook is called, mvcc transaction have started.
- * @param walEdit the output WAL edits to apply to write ahead log
- */
- void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException;
-
- /**
- * The hook to be executed after the process() and applying the Mutations to region. The
- * difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will
- * be executed before the mvcc transaction completion.
- */
- void postBatchMutate(HRegion region) throws IOException;
-
- /**
- * The hook to be executed after process() and applying the Mutations to region.
- *
- * @param region the HRegion
- * @param walEdit the output WAL edits to apply to write ahead log
- * @param success true if batch operation is successful otherwise false.
- */
- void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException;
-
- /**
- * @return The cluster ids that have the change.
- */
- List<UUID> getClusterIds();
-
- /**
- * Human readable name of the processor
- * @return The name of the processor
- */
- String getName();
-
- /**
- * This method should return any additional data that is needed on the
- * server side to construct the RowProcessor. The server will pass this to
- * the {@link #initialize(Message msg)} method. If there is no RowProcessor
- * specific data then null should be returned.
- * @return the PB message
- * @throws IOException
- */
- S getRequestData() throws IOException;
-
- /**
- * This method should initialize any field(s) of the RowProcessor with
- * a parsing of the passed message bytes (used on the server side).
- * @param msg
- * @throws IOException
- */
- void initialize(S msg) throws IOException;
-
- /**
- * @return The {@link Durability} to use
- */
- Durability useDurability();
-}