You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/02 01:44:58 UTC
hbase git commit: HBASE-14575 Relax region read lock for compactions
(Nick and Ted)
Repository: hbase
Updated Branches:
refs/heads/master 92e178df2 -> d8b30b892
HBASE-14575 Relax region read lock for compactions (Nick and Ted)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8b30b89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8b30b89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8b30b89
Branch: refs/heads/master
Commit: d8b30b892561126b02bd5d27dbe11a01f97908d1
Parents: 92e178d
Author: tedyu <yu...@gmail.com>
Authored: Tue Dec 1 16:44:59 2015 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Dec 1 16:44:59 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 84 ++++++++++++++++++--
.../regionserver/TestHRegionServerBulkLoad.java | 73 ++++++++++++++---
2 files changed, 136 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b30b89/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7bf4855..557edd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1793,8 +1793,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
MonitoredTask status = null;
boolean requestNeedsCancellation = true;
- // block waiting for the lock for compaction
- lock.readLock().lock();
+ /*
+ * We are trying to remove / relax the region read lock for compaction.
+ * Let's see what are the potential race conditions among the operations (user scan,
+ * region split, region close and region bulk load).
+ *
+ * user scan ---> region read lock
+ * region split --> region close first --> region write lock
+ * region close --> region write lock
+ * region bulk load --> region write lock
+ *
+ * read lock is compatible with read lock. ---> no problem with user scan/read
+ * region bulk load does not cause problem for compaction (no consistency problem, store lock
+ * will help the store file accounting).
+ * They can run almost concurrently at the region level.
+ *
+ * The only remaining race condition is between the region close and compaction.
+ * So we will evaluate, below, how region close intervenes with compaction if compaction does
+ * not acquire region read lock.
+ *
+ * Here are the steps for compaction:
+ * 1. obtain list of StoreFile's
+ * 2. create StoreFileScanner's based on list from #1
+ * 3. perform compaction and save resulting files under tmp dir
+ * 4. swap in compacted files
+ *
+ * #1 is guarded by store lock. This patch does not change this --> no worse or better
+ * For #2, we obtain smallest read point (for region) across all the Scanners (for both default
+ * compactor and stripe compactor).
+ * The read points are for user scans. Region keeps the read points for all currently open
+ * user scanners.
+ * Compaction needs to know the smallest read point so that during re-write of the hfiles,
+ * it can remove the mvcc points for the cells if their mvccs are older than the smallest
+ * since they are not needed anymore.
+ * This will not conflict with compaction.
+ * For #3, it can be performed in parallel to other operations.
+ * For #4 bulk load and compaction don't conflict with each other on the region level
+ * (for multi-family atomicy).
+ * Region close and compaction are guarded pretty well by the 'writestate'.
+ * In HRegion#doClose(), we have :
+ * synchronized (writestate) {
+ * // Disable compacting and flushing by background threads for this
+ * // region.
+ * canFlush = !writestate.readOnly;
+ * writestate.writesEnabled = false;
+ * LOG.debug("Closing " + this + ": disabling compactions & flushes");
+ * waitForFlushesAndCompactions();
+ * }
+ * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0.
+ * and in HRegion.compact()
+ * try {
+ * synchronized (writestate) {
+ * if (writestate.writesEnabled) {
+ * wasStateSet = true;
+ * ++writestate.compacting;
+ * } else {
+ * String msg = "NOT compacting region " + this + ". Writes disabled.";
+ * LOG.info(msg);
+ * status.abort(msg);
+ * return false;
+ * }
+ * }
+ * Also in compactor.performCompaction():
+ * check periodically to see if a system stop is requested
+ * if (closeCheckInterval > 0) {
+ * bytesWritten += len;
+ * if (bytesWritten > closeCheckInterval) {
+ * bytesWritten = 0;
+ * if (!store.areWritesEnabled()) {
+ * progress.cancel();
+ * return false;
+ * }
+ * }
+ * }
+ */
try {
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
if (stores.get(cf) != store) {
@@ -1852,12 +1924,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.markComplete("Compaction complete");
return true;
} finally {
- try {
- if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
- if (status != null) status.cleanup();
- } finally {
- lock.readLock().unlock();
- }
+ if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
+ if (status != null) status.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b30b89/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 22e91f0..87cbab7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -17,7 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.collect.Lists;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +52,9 @@ import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -60,24 +73,20 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
/**
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
* the region server's bullkLoad functionality.
*/
+@RunWith(Parameterized.class)
@Category({RegionServerTests.class, LargeTests.class})
public class TestHRegionServerBulkLoad {
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
@@ -85,6 +94,7 @@ public class TestHRegionServerBulkLoad {
private final static Configuration conf = UTIL.getConfiguration();
private final static byte[] QUAL = Bytes.toBytes("qual");
private final static int NUM_CFS = 10;
+ private int sleepDuration;
public static int BLOCKSIZE = 64 * 1024;
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
@@ -94,6 +104,24 @@ public class TestHRegionServerBulkLoad {
families[i] = Bytes.toBytes(family(i));
}
}
+ @Parameters
+ public static final Collection<Object[]> parameters() {
+ int[] sleepDurations = new int[] { 0, 30000 };
+ List<Object[]> configurations = new ArrayList<Object[]>();
+ for (int i : sleepDurations) {
+ configurations.add(new Object[] { i });
+ }
+ return configurations;
+ }
+
+ public TestHRegionServerBulkLoad(int duration) {
+ this.sleepDuration = duration;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf.setInt("hbase.rpc.timeout", 10 * 1000);
+ }
/**
* Create a rowkey compatible with
@@ -189,8 +217,8 @@ public class TestHRegionServerBulkLoad {
caller.callWithRetries(callable, Integer.MAX_VALUE);
// Periodically do compaction to reduce the number of open file handles.
- if (numBulkLoads.get() % 10 == 0) {
- // 10 * 50 = 500 open file handles!
+ if (numBulkLoads.get() % 5 == 0) {
+ // 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
@@ -211,6 +239,23 @@ public class TestHRegionServerBulkLoad {
}
}
+ public static class MyObserver extends BaseRegionObserver {
+ static int sleepDuration;
+ @Override
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+ final Store store, final InternalScanner scanner, final ScanType scanType)
+ throws IOException {
+ try {
+ Thread.sleep(sleepDuration);
+ } catch (InterruptedException ie) {
+ IOException ioe = new InterruptedIOException();
+ ioe.initCause(ie);
+ throw ioe;
+ }
+ return scanner;
+ }
+ }
+
/**
* Thread that does full scans of the table looking for any partially
* completed rows.
@@ -278,6 +323,8 @@ public class TestHRegionServerBulkLoad {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);
+ htd.addCoprocessor(MyObserver.class.getName());
+ MyObserver.sleepDuration = this.sleepDuration;
for (int i = 0; i < 10; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
@@ -348,7 +395,7 @@ public class TestHRegionServerBulkLoad {
public static void main(String args[]) throws Exception {
try {
Configuration c = HBaseConfiguration.create();
- TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
+ TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
test.setConf(c);
test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
} finally {