You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2016/11/01 19:49:29 UTC
[05/50] [abbrv] hbase git commit: HBASE-16788 Guard HFile archiving
under a separate lock
HBASE-16788 Guard HFile archiving under a separate lock
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/89bef67d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/89bef67d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/89bef67d
Branch: refs/heads/branch-1
Commit: 89bef67d0c020662599f682309c47a5ed25c9b32
Parents: 59ca4da
Author: Gary Helmling <ga...@apache.org>
Authored: Fri Oct 7 10:42:20 2016 -0700
Committer: Gary Helmling <ga...@apache.org>
Committed: Mon Oct 10 16:06:55 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HStore.java | 54 +++--
.../TestCompactionArchiveConcurrentClose.java | 198 +++++++++++++++++++
2 files changed, 236 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/89bef67d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6ee6bb5..74f5a1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -149,6 +150,19 @@ public class HStore implements Store {
* - completing a compaction
*/
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /**
+ * Lock specific to archiving compacted store files. This avoids races around
+ * the combination of retrieving the list of compacted files and moving them to
+ * the archive directory. Since this is usually a background process (other than
+ * on close), we don't want to handle this with the store write lock, which would
+ * block readers and degrade performance.
+ *
+ * Locked by:
+ * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles()
+ * - close()
+ */
+ final ReentrantLock archiveLock = new ReentrantLock();
+
private final boolean verifyBulkLoads;
private ScanInfo scanInfo;
@@ -835,6 +849,7 @@ public class HStore implements Store {
@Override
public ImmutableCollection<StoreFile> close() throws IOException {
+ this.archiveLock.lock();
this.lock.writeLock().lock();
try {
// Clear so metrics doesn't find them.
@@ -890,6 +905,7 @@ public class HStore implements Store {
return result;
} finally {
this.lock.writeLock().unlock();
+ this.archiveLock.unlock();
}
}
@@ -2641,26 +2657,32 @@ public class HStore implements Store {
}
@Override
- public void closeAndArchiveCompactedFiles() throws IOException {
- lock.readLock().lock();
- Collection<StoreFile> copyCompactedfiles = null;
+ public synchronized void closeAndArchiveCompactedFiles() throws IOException {
+ // ensure other threads do not attempt to archive the same files on close()
+ archiveLock.lock();
try {
- Collection<StoreFile> compactedfiles =
- this.getStoreEngine().getStoreFileManager().getCompactedfiles();
- if (compactedfiles != null && compactedfiles.size() != 0) {
- // Do a copy under read lock
- copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("No compacted files to archive");
- return;
+ lock.readLock().lock();
+ Collection<StoreFile> copyCompactedfiles = null;
+ try {
+ Collection<StoreFile> compactedfiles =
+ this.getStoreEngine().getStoreFileManager().getCompactedfiles();
+ if (compactedfiles != null && compactedfiles.size() != 0) {
+ // Do a copy under read lock
+ copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No compacted files to archive");
+ return;
+ }
}
+ } finally {
+ lock.readLock().unlock();
+ }
+ if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
+ removeCompactedfiles(copyCompactedfiles);
}
} finally {
- lock.readLock().unlock();
- }
- if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
- removeCompactedfiles(copyCompactedfiles);
+ archiveLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/89bef67d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
new file mode 100644
index 0000000..03072e2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests a race condition between archiving of compacted files in CompactedHFilesDischarger chore
+ * and HRegion.close();
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestCompactionArchiveConcurrentClose {
+ public HBaseTestingUtility testUtil;
+
+ private Path testDir;
+ private AtomicBoolean archived = new AtomicBoolean();
+
+ @Before
+ public void setup() throws Exception {
+ testUtil = HBaseTestingUtility.createLocalHTU();
+ testDir = testUtil.getDataTestDir("TestStoreFileRefresherChore");
+ FSUtils.setRootDir(testUtil.getConfiguration(), testDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ testUtil.cleanupTestDir();
+ }
+
+ @Test
+ public void testStoreCloseAndDischargeRunningInParallel() throws Exception {
+ byte[] fam = Bytes.toBytes("f");
+ byte[] col = Bytes.toBytes("c");
+ byte[] val = Bytes.toBytes("val");
+
+ TableName tableName = TableName.valueOf(getClass().getSimpleName());
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(fam));
+ HRegionInfo info = new HRegionInfo(tableName, null, null, false);
+ final Region region = initHRegion(htd, info);
+ RegionServerServices rss = mock(RegionServerServices.class);
+ List<Region> regions = new ArrayList<Region>();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+
+ // Create the cleaner object
+ final CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+ // Add some data to the region and do some flushes
+ int batchSize = 10;
+ int fileCount = 10;
+ for (int f = 0; f < fileCount; f++) {
+ int start = f * batchSize;
+ for (int i = start; i < start + batchSize; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, col, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ }
+
+ Store store = region.getStore(fam);
+ assertEquals(fileCount, store.getStorefilesCount());
+
+ Collection<StoreFile> storefiles = store.getStorefiles();
+ // None of the files should be in compacted state.
+ for (StoreFile file : storefiles) {
+ assertFalse(file.isCompactedAway());
+ }
+ // Do compaction
+ region.compact(true);
+
+ // now run the cleaner with a concurrent close
+ Thread cleanerThread = new Thread() {
+ public void run() {
+ cleaner.chore();
+ }
+ };
+ cleanerThread.start();
+ // wait for cleaner to pause
+ synchronized (archived) {
+ if (!archived.get()) {
+ archived.wait();
+ }
+ }
+ final AtomicReference<Exception> closeException = new AtomicReference<>();
+ Thread closeThread = new Thread() {
+ public void run() {
+ // wait for the chore to complete and call close
+ try {
+ ((HRegion) region).close();
+ } catch (IOException e) {
+ closeException.set(e);
+ }
+ }
+ };
+ closeThread.start();
+ // no error should occur after the execution of the test
+ closeThread.join();
+ cleanerThread.join();
+
+ if (closeException.get() != null) {
+ throw closeException.get();
+ }
+ }
+
+ private Region initHRegion(HTableDescriptor htd, HRegionInfo info)
+ throws IOException {
+ Configuration conf = testUtil.getConfiguration();
+ Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
+
+ HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf),
+ tableDir, info);
+ final Configuration walConf = new Configuration(conf);
+ FSUtils.setRootDir(walConf, tableDir);
+ final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName());
+ HRegion region =
+ new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()),
+ conf, htd, null);
+
+ region.initialize();
+
+ return region;
+ }
+
+ private class WaitingHRegionFileSystem extends HRegionFileSystem {
+
+ public WaitingHRegionFileSystem(final Configuration conf, final FileSystem fs,
+ final Path tableDir, final HRegionInfo regionInfo) {
+ super(conf, fs, tableDir, regionInfo);
+ }
+
+ @Override
+ public void removeStoreFiles(String familyName, Collection<StoreFile> storeFiles)
+ throws IOException {
+ super.removeStoreFiles(familyName, storeFiles);
+ archived.set(true);
+ synchronized (archived) {
+ archived.notifyAll();
+ }
+ try {
+ // unfortunately we can't use a stronger barrier here as the fix synchronizing
+ // the race condition will then block
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException("Interrupted waiting for latch");
+ }
+ }
+ }
+}