You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/04 05:51:24 UTC
[2/2] hbase git commit: HBASE-13082 Coarsen StoreScanner locks to
RegionScanner (Ram)
HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b3d1f14
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b3d1f14
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b3d1f14
Branch: refs/heads/master
Commit: 8b3d1f144408e4a7a014c5ac46418c9e91b9b0db
Parents: b326042
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Dec 4 10:20:46 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Dec 4 10:20:46 2015 +0530
----------------------------------------------------------------------
.../regionserver/DefaultStoreFileManager.java | 58 ++-
.../hadoop/hbase/regionserver/HRegion.java | 26 +-
.../hadoop/hbase/regionserver/HStore.java | 213 ++++++----
.../regionserver/ReversedStoreScanner.java | 19 +-
.../apache/hadoop/hbase/regionserver/Store.java | 5 +
.../hadoop/hbase/regionserver/StoreFile.java | 67 +++-
.../hbase/regionserver/StoreFileManager.java | 25 +-
.../hbase/regionserver/StoreFileScanner.java | 3 +
.../hadoop/hbase/regionserver/StoreScanner.java | 384 +++++++++---------
.../regionserver/StripeStoreFileManager.java | 82 +++-
.../compactions/CompactedHFilesDischarger.java | 74 ++++
.../org/apache/hadoop/hbase/TestIOFencing.java | 11 -
.../TestZooKeeperTableArchiveClient.java | 23 +-
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 16 +
.../master/cleaner/TestSnapshotFromMaster.java | 8 +
.../hbase/regionserver/MockStoreFile.java | 5 +
.../regionserver/TestEncryptionKeyRotation.java | 59 ++-
.../regionserver/TestHRegionReplayEvents.java | 3 +
.../TestRegionMergeTransactionOnCluster.java | 29 +-
.../hbase/regionserver/TestRegionReplicas.java | 11 +-
.../hadoop/hbase/regionserver/TestStore.java | 13 +
.../TestStripeStoreFileManager.java | 19 +
.../TestCompactedHFilesDischarger.java | 389 +++++++++++++++++++
24 files changed, 1211 insertions(+), 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index 9f38b9e..d38306c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager {
* is atomically replaced when its contents change.
*/
private volatile ImmutableList<StoreFile> storefiles = null;
+ /**
+ * List of compacted files inside this store that needs to be excluded in reads
+ * because further new reads will be using only the newly created files out of compaction.
+ * These compacted files will be deleted/cleared once all the existing readers on these
+ * compacted files are done.
+ */
+ private volatile List<StoreFile> compactedfiles = null;
public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf,
CompactionConfiguration comConf) {
@@ -75,6 +82,11 @@ class DefaultStoreFileManager implements StoreFileManager {
}
@Override
+ public Collection<StoreFile> getCompactedfiles() {
+ return compactedfiles;
+ }
+
+ @Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.addAll(sfs);
@@ -89,19 +101,55 @@ class DefaultStoreFileManager implements StoreFileManager {
}
@Override
+ public Collection<StoreFile> clearCompactedFiles() {
+ List<StoreFile> result = compactedfiles;
+ compactedfiles = new ArrayList<StoreFile>();
+ return result;
+ }
+
+ @Override
public final int getStorefileCount() {
return storefiles.size();
}
@Override
public void addCompactionResults(
- Collection<StoreFile> compactedFiles, Collection<StoreFile> results) {
+ Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
- newStoreFiles.removeAll(compactedFiles);
+ newStoreFiles.removeAll(newCompactedfiles);
if (!results.isEmpty()) {
newStoreFiles.addAll(results);
}
sortAndSetStoreFiles(newStoreFiles);
+ ArrayList<StoreFile> updatedCompactedfiles = null;
+ if (this.compactedfiles != null) {
+ updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
+ updatedCompactedfiles.addAll(newCompactedfiles);
+ } else {
+ updatedCompactedfiles = new ArrayList<StoreFile>(newCompactedfiles);
+ }
+ markCompactedAway(newCompactedfiles);
+ this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+ }
+
+ // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+ // Let a background thread close the actual reader on these compacted files and also
+ // ensure to evict the blocks from block cache so that they are no longer in
+ // cache
+ private void markCompactedAway(Collection<StoreFile> compactedFiles) {
+ for (StoreFile file : compactedFiles) {
+ file.markCompactedAway();
+ }
+ }
+
+ @Override
+ public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException {
+ ArrayList<StoreFile> updatedCompactedfiles = null;
+ if (this.compactedfiles != null) {
+ updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
+ updatedCompactedfiles.removeAll(removedCompactedfiles);
+ this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+ }
}
@Override
@@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager {
storefiles = ImmutableList.copyOf(storeFiles);
}
+ private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
+ // Sorting may not be really needed here for the compacted files?
+ Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
+ return new ArrayList<StoreFile>(storefiles);
+ }
+
@Override
public double getCompactionPressure() {
int storefileCount = getStorefileCount();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 557edd9..484d5ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -148,6 +149,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -297,6 +299,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected final Configuration conf;
private final Configuration baseConf;
private final int rowLockWaitDuration;
+ private CompactedHFilesDischarger compactedFileDischarger;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
// The internal wait duration to acquire a lock before read/update
@@ -809,6 +812,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeStores(reporter, status);
+ // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
+ // that will no longer be used in reads.
+ if (this.getRegionServerServices() != null) {
+ ChoreService choreService = this.getRegionServerServices().getChoreService();
+ if (choreService != null) {
+ // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
+ // 2 mins so that compacted files can be archived before the TTLCleaner runs
+ int cleanerInterval =
+ conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
+ this.compactedFileDischarger =
+ new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
+ choreService.scheduleChore(compactedFileDischarger);
+ }
+ }
this.mvcc.advanceTo(maxSeqId);
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
// Recover any edits if available.
@@ -1513,6 +1530,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper);
}
+ // stop the Compacted hfile discharger
+ if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
+
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
@@ -6658,6 +6678,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
}
+ // clear the compacted files if any
+ for (Store s : dstRegion.getStores()) {
+ s.closeAndArchiveCompactedFiles();
+ }
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?");
@@ -7527,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 184d44f..50b3de7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -39,7 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -90,7 +93,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -138,6 +141,8 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
+ private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
+ private CompletionService<StoreFile> completionService = null;
/**
* RWLock for store operations.
@@ -181,7 +186,6 @@ public class HStore implements Store {
private long blockingFileCount;
private int compactionCheckMultiplier;
-
protected Encryption.Context cryptoContext = Encryption.Context.NONE;
private volatile long flushedCellsCount = 0;
@@ -272,7 +276,10 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber);
}
-
+ compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
+ conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
+ completionService =
+ new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor);
// Crypto context for new store files
String cipherName = family.getEncryptionType();
if (cipherName != null) {
@@ -551,14 +558,15 @@ public class HStore implements Store {
try {
Future<StoreFile> future = completionService.take();
StoreFile storeFile = future.get();
- long length = storeFile.getReader().length();
- this.storeSize += length;
- this.totalUncompressedBytes +=
- storeFile.getReader().getTotalUncompressedBytes();
- if (LOG.isDebugEnabled()) {
- LOG.debug("loaded " + storeFile.toStringDetailed());
+ if (storeFile != null) {
+ long length = storeFile.getReader().length();
+ this.storeSize += length;
+ this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loaded " + storeFile.toStringDetailed());
+ }
+ results.add(storeFile);
}
- results.add(storeFile);
} catch (InterruptedException e) {
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
@@ -656,8 +664,7 @@ public class HStore implements Store {
region.getMVCC().advanceTo(this.getMaxSequenceId());
}
- // notify scanners, close file readers, and recompute store size
- completeCompaction(toBeRemovedStoreFiles, false);
+ completeCompaction(toBeRemovedStoreFiles);
}
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
@@ -834,7 +841,6 @@ public class HStore implements Store {
// the lock.
this.lock.writeLock().unlock();
}
- notifyChangedReadersObservers();
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files ["
@@ -850,7 +856,10 @@ public class HStore implements Store {
try {
// Clear so metrics doesn't find them.
ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
-
+ Collection<StoreFile> compactedfiles =
+ storeEngine.getStoreFileManager().clearCompactedFiles();
+ // clear the compacted files
+ removeCompactedFiles(compactedfiles);
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -865,7 +874,7 @@ public class HStore implements Store {
@Override
public Void call() throws IOException {
boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
+ cacheConf != null? cacheConf.shouldEvictOnClose(): true;
f.closeReader(evictOnClose);
return null;
}
@@ -892,6 +901,9 @@ public class HStore implements Store {
}
if (ioe != null) throw ioe;
}
+ if (compactionCleanerthreadPoolExecutor != null) {
+ compactionCleanerthreadPoolExecutor.shutdownNow();
+ }
LOG.info("Closed " + this);
return result;
} finally {
@@ -1087,10 +1099,8 @@ public class HStore implements Store {
// the lock.
this.lock.writeLock().unlock();
}
-
- // Tell listeners of the change in readers.
+ // notify to be called here - only in case of flushes
notifyChangedReadersObservers();
-
if (LOG.isTraceEnabled()) {
long totalSize = 0;
for (StoreFile sf : sfs) {
@@ -1109,7 +1119,7 @@ public class HStore implements Store {
* @throws IOException
*/
private void notifyChangedReadersObservers() throws IOException {
- for (ChangedReadersObserver o: this.changedReaderObservers) {
+ for (ChangedReadersObserver o : this.changedReaderObservers) {
o.updateReaders();
}
}
@@ -1268,7 +1278,7 @@ public class HStore implements Store {
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
// At this point the store will use new files for all new scanners.
- completeCompaction(filesToCompact, true); // Archive old files & update store size.
+ completeCompaction(filesToCompact); // update store size.
logCompactionEndMessage(cr, sfs, compactionStartTime);
return sfs;
@@ -1456,7 +1466,7 @@ public class HStore implements Store {
LOG.info("Replaying compaction marker, replacing input files: " +
inputStoreFiles + " with output files : " + outputStoreFiles);
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
- this.completeCompaction(inputStoreFiles, removeFiles);
+ this.completeCompaction(inputStoreFiles);
}
}
@@ -1508,7 +1518,7 @@ public class HStore implements Store {
this.getCoprocessorHost().postCompact(this, sf, null);
}
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
- completeCompaction(filesToCompact, true);
+ completeCompaction(filesToCompact);
}
} finally {
synchronized (filesCompacting) {
@@ -1770,54 +1780,7 @@ public class HStore implements Store {
@VisibleForTesting
protected void completeCompaction(final Collection<StoreFile> compactedFiles)
throws IOException {
- completeCompaction(compactedFiles, true);
- }
-
-
- /**
- * <p>It works by processing a compaction that's been written to disk.
- *
- * <p>It is usually invoked at the end of a compaction, but might also be
- * invoked at HStore startup, if the prior execution died midway through.
- *
- * <p>Moving the compacted TreeMap into place means:
- * <pre>
- * 1) Unload all replaced StoreFile, close and collect list to delete.
- * 2) Compute new store size
- * </pre>
- *
- * @param compactedFiles list of files that were compacted
- */
- @VisibleForTesting
- protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
- throws IOException {
- try {
- // Do not delete old store files until we have sent out notification of
- // change in case old files are still being accessed by outstanding scanners.
- // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
- // scenario that could have happened if continue to hold the lock.
- notifyChangedReadersObservers();
- // At this point the store will use new files for all scanners.
-
- // let the archive util decide if we should archive or delete the files
- LOG.debug("Removing store files after compaction...");
- boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
- for (StoreFile compactedFile : compactedFiles) {
- compactedFile.closeReader(evictOnClose);
- }
- if (removeFiles) {
- this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
- }
- } catch (IOException e) {
- e = e instanceof RemoteException ?
- ((RemoteException)e).unwrapRemoteException() : e;
- LOG.error("Failed removing compacted files in " + this +
- ". Files we were trying to remove are " + compactedFiles.toString() +
- "; some of them may have been already removed", e);
- }
-
- // 4. Compute new store size
+ LOG.debug("Completing compaction...");
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
@@ -2248,7 +2211,7 @@ public class HStore implements Store {
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@@ -2365,4 +2328,112 @@ public class HStore implements Store {
public boolean isPrimaryReplicaStore() {
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
}
+
+ @Override
+ public void closeAndArchiveCompactedFiles() throws IOException {
+ lock.readLock().lock();
+ Collection<StoreFile> copyCompactedfiles = null;
+ try {
+ Collection<StoreFile> compactedfiles =
+ this.getStoreEngine().getStoreFileManager().getCompactedfiles();
+ if (compactedfiles != null && compactedfiles.size() != 0) {
+ // Do a copy under read lock
+ copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No compacted files to archive");
+ return;
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ removeCompactedFiles(copyCompactedfiles);
+ }
+
+ private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
+ return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "CompactedfilesArchiver-" + count++);
+ }
+ });
+ }
+
+ private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException {
+ if (compactedfiles != null && !compactedfiles.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing the compacted store files " + compactedfiles);
+ }
+ for (final StoreFile file : compactedfiles) {
+ completionService.submit(new Callable<StoreFile>() {
+ @Override
+ public StoreFile call() throws IOException {
+ synchronized (file) {
+ try {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The file " + file + " was closed but still not archived.");
+ }
+ return file;
+ }
+ if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+ // Even if deleting fails we need not bother as any new scanners won't be
+ // able to use the compacted file as the status is already compactedAway
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing and archiving the file " + file.getPath());
+ }
+ r.close(true);
+ // Just close and return
+ return file;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while trying to close the compacted store file "
+ + file.getPath().getName());
+ }
+ }
+ return null;
+ }
+ });
+ }
+ final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
+ try {
+ for (final StoreFile file : compactedfiles) {
+ Future<StoreFile> future = completionService.take();
+ StoreFile closedFile = future.get();
+ if (closedFile != null) {
+ filesToRemove.add(closedFile);
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted exception while closing the compacted files", ie);
+ } catch (Exception e) {
+ LOG.error("Exception occured while closing the compacted files", e);
+ }
+ if (isPrimaryReplicaStore()) {
+ archiveAndRemoveCompactedFiles(filesToRemove);
+ }
+
+ }
+ }
+
+ private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException {
+ if (!filesToArchive.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Moving the files " + filesToArchive + " to archive");
+ }
+ // Only if this is successful it has to be removed
+ this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
+ try {
+ lock.writeLock().lock();
+ this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index d198d7b..0e1d90f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,24 +123,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
- lock.lock();
- try {
- checkReseek();
- return this.heap.seekToPreviousRow(key);
- } finally {
- lock.unlock();
- }
-
+ checkReseek();
+ return this.heap.seekToPreviousRow(key);
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
- lock.lock();
- try {
- checkReseek();
- return this.heap.backwardSeek(key);
- } finally {
- lock.unlock();
- }
+ checkReseek();
+ return this.heap.backwardSeek(key);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 83a24a5..f137a8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -466,4 +466,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
boolean isPrimaryReplicaStore();
+
+ /**
+ * Closes and archives the compacted files under this store
+ */
+ void closeAndArchiveCompactedFiles() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 6e5f441..2b9d101 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -378,6 +381,19 @@ public class StoreFile {
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
}
+ @VisibleForTesting
+ public boolean isCompactedAway() {
+ if (this.reader != null) {
+ return this.reader.isCompactedAway();
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ public int getRefCount() {
+ return this.reader.refCount.get();
+ }
+
/**
* Return the timestamp at which this bulk load file was generated.
*/
@@ -553,6 +569,15 @@ public class StoreFile {
}
/**
+ * Marks the status of the file as compactedAway.
+ */
+ public void markCompactedAway() {
+ if (this.reader != null) {
+ this.reader.markCompactedAway();
+ }
+ }
+
+ /**
* Delete this file
* @throws IOException
*/
@@ -1137,6 +1162,12 @@ public class StoreFile {
private boolean bulkLoadResult = false;
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
private boolean skipResetSeqId = true;
+ // Counter that is incremented every time a scanner is created on the
+ // store file. It is decremented when the scan on the store file is
+ // done.
+ private AtomicInteger refCount = new AtomicInteger(0);
+ // Indicates if the file got compacted
+ private volatile boolean compactedAway = false;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException {
@@ -1144,6 +1175,10 @@ public class StoreFile {
bloomFilterType = BloomType.NONE;
}
+ void markCompactedAway() {
+ this.compactedAway = true;
+ }
+
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Configuration conf) throws IOException {
reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
@@ -1195,12 +1230,36 @@ public class StoreFile {
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread,
boolean isCompaction, long readPt) {
+ // Increment the ref count
+ refCount.incrementAndGet();
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, isCompaction),
!isCompaction, reader.hasMVCCInfo(), readPt);
}
/**
+ * Decrement the ref count associated with the reader when ever a scanner associated
+ * with the reader is closed
+ */
+ void decrementRefCount() {
+ refCount.decrementAndGet();
+ }
+
+ /**
+ * @return true if the file is still used in reads
+ */
+ public boolean isReferencedInReads() {
+ return refCount.get() != 0;
+ }
+
+ /**
+ * @return true if the file is compacted
+ */
+ public boolean isCompactedAway() {
+ return this.compactedAway;
+ }
+
+ /**
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
@@ -1710,7 +1769,13 @@ public class StoreFile {
private static class GetFileSize implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
- return sf.getReader().length();
+ if (sf.getReader() != null) {
+ return sf.getReader().length();
+ } else {
+ // the reader may be null for the compacted files and if the archiving
+ // had failed.
+ return -1L;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 11993db..7e70547 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -53,7 +53,7 @@ public interface StoreFileManager {
void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
/**
- * Adds compaction results into the structure.
+ * Adds only the new compaction results into the structure.
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
*/
@@ -61,12 +61,26 @@ public interface StoreFileManager {
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
/**
+ * Remove the compacted files
+ * @param compactedFiles the list of compacted files
+ * @throws IOException
+ */
+ void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException;
+
+ /**
* Clears all the files currently in use and returns them.
* @return The files previously in use.
*/
ImmutableCollection<StoreFile> clearFiles();
/**
+ * Clears all the compacted files and returns them. This method is expected to be
+ * accessed single threaded.
+ * @return The files compacted previously.
+ */
+ Collection<StoreFile> clearCompactedFiles();
+
+ /**
* Gets the snapshot of the store files currently in use. Can be used for things like metrics
* and checks; should not assume anything about relations between store files in the list.
* @return The list of StoreFiles.
@@ -74,6 +88,15 @@ public interface StoreFileManager {
Collection<StoreFile> getStorefiles();
/**
+ * List of compacted files inside this store that needs to be excluded in reads
+ * because further new reads will be using only the newly created files out of compaction.
+ * These compacted files will be deleted/cleared once all the existing readers on these
+ * compacted files are done.
+ * @return the list of compacted files
+ */
+ Collection<StoreFile> getCompactedfiles();
+
+ /**
* Returns the number of files currently in use.
* @return The number of files.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index fb154c0..c864733 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -248,6 +248,9 @@ public class StoreFileScanner implements KeyValueScanner {
public void close() {
cur = null;
this.hfs.close();
+ if (this.reader != null) {
+ this.reader.decrementRefCount();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index a4e066c..44f07f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -125,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// A flag whether use pread for scan
private boolean scanUsePread = false;
- protected ReentrantLock lock = new ReentrantLock();
+ // Indicates whether there was flush during the course of the scan
+ protected volatile boolean flushed = false;
protected final long readPt;
@@ -403,15 +403,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
- lock.lock();
- try {
+ checkResetHeap();
if (this.heap == null) {
return this.lastTop;
}
return this.heap.peek();
- } finally {
- lock.unlock();
- }
}
@Override
@@ -425,46 +421,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
close(true);
}
- private void close(boolean withHeapClose){
- lock.lock();
- try {
- if (this.closing) {
- return;
+ private void close(boolean withHeapClose) {
+ if (this.closing) {
+ return;
+ }
+ if (withHeapClose) this.closing = true;
+ // Under test, we dont have a this.store
+ if (this.store != null) this.store.deleteChangedReaderObserver(this);
+ if (withHeapClose) {
+ for (KeyValueHeap h : this.heapsForDelayedClose) {
+ h.close();
}
- if (withHeapClose) this.closing = true;
- // Under test, we dont have a this.store
- if (this.store != null) this.store.deleteChangedReaderObserver(this);
- if (withHeapClose) {
- for (KeyValueHeap h : this.heapsForDelayedClose) {
- h.close();
- }
- this.heapsForDelayedClose.clear();
- if (this.heap != null) {
- this.heap.close();
- this.heap = null; // CLOSED!
- }
- } else {
- if (this.heap != null) {
- this.heapsForDelayedClose.add(this.heap);
- this.heap = null;
- }
+ this.heapsForDelayedClose.clear();
+ if (this.heap != null) {
+ this.heap.close();
+ this.heap = null; // CLOSED!
+ }
+ } else {
+ if (this.heap != null) {
+ this.heapsForDelayedClose.add(this.heap);
+ this.heap = null;
}
- this.lastTop = null; // If both are null, we are closed.
- } finally {
- lock.unlock();
}
+ this.lastTop = null; // If both are null, we are closed.
}
@Override
public boolean seek(Cell key) throws IOException {
- lock.lock();
- try {
+ checkResetHeap();
// reset matcher state, in case that underlying store changed
checkReseek();
return this.heap.seek(key);
- } finally {
- lock.unlock();
- }
}
@Override
@@ -480,173 +467,168 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
- lock.lock();
- try {
- if (scannerContext == null) {
- throw new IllegalArgumentException("Scanner context cannot be null");
- }
- if (checkReseek()) {
- return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
- }
+ if (scannerContext == null) {
+ throw new IllegalArgumentException("Scanner context cannot be null");
+ }
+ checkResetHeap();
+ if (checkReseek()) {
+ return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+ }
- // if the heap was left null, then the scanners had previously run out anyways, close and
- // return.
- if (this.heap == null) {
- // By this time partial close should happened because already heap is null
- close(false);// Do all cleanup except heap.close()
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ // if the heap was left null, then the scanners had previously run out anyways, close and
+ // return.
+ if (this.heap == null) {
+ // By this time partial close should happened because already heap is null
+ close(false);// Do all cleanup except heap.close()
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ }
- Cell cell = this.heap.peek();
- if (cell == null) {
- close(false);// Do all cleanup except heap.close()
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ Cell cell = this.heap.peek();
+ if (cell == null) {
+ close(false);// Do all cleanup except heap.close()
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ }
- // only call setRow if the row changes; avoids confusing the query matcher
- // if scanning intra-row
+ // only call setRow if the row changes; avoids confusing the query matcher
+ // if scanning intra-row
- // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
- // rows. Else it is possible we are still traversing the same row so we must perform the row
- // comparison.
- if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
- !CellUtil.matchingRow(cell, matcher.curCell)) {
- this.countPerRow = 0;
- matcher.setToNewRow(cell);
- }
+ // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
+ // rows. Else it is possible we are still traversing the same row so we must perform the row
+ // comparison.
+ if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null
+ || !CellUtil.matchingRow(cell, matcher.curCell)) {
+ this.countPerRow = 0;
+ matcher.setToNewRow(cell);
+ }
- // Clear progress away unless invoker has indicated it should be kept.
- if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
+ // Clear progress away unless invoker has indicated it should be kept.
+ if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
- // Only do a sanity-check if store and comparator are available.
- CellComparator comparator = store != null ? store.getComparator() : null;
+ // Only do a sanity-check if store and comparator are available.
+ CellComparator comparator = store != null ? store.getComparator() : null;
- int count = 0;
- long totalBytesRead = 0;
+ int count = 0;
+ long totalBytesRead = 0;
- LOOP: do {
- // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
- if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
- scannerContext.updateTimeProgress();
- if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
- return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
- }
+ LOOP: do {
+ // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
+ if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
+ scannerContext.updateTimeProgress();
+ if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+ return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
+ }
- if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
- checkScanOrder(prevCell, cell, comparator);
- prevCell = cell;
+ if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
+ checkScanOrder(prevCell, cell, comparator);
+ prevCell = cell;
- ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
- qcode = optimize(qcode, cell);
- switch(qcode) {
- case INCLUDE:
- case INCLUDE_AND_SEEK_NEXT_ROW:
- case INCLUDE_AND_SEEK_NEXT_COL:
+ ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
+ qcode = optimize(qcode, cell);
+ switch (qcode) {
+ case INCLUDE:
+ case INCLUDE_AND_SEEK_NEXT_ROW:
+ case INCLUDE_AND_SEEK_NEXT_COL:
- Filter f = matcher.getFilter();
- if (f != null) {
- cell = f.transformCell(cell);
- }
+ Filter f = matcher.getFilter();
+ if (f != null) {
+ cell = f.transformCell(cell);
+ }
- this.countPerRow++;
- if (storeLimit > -1 &&
- this.countPerRow > (storeLimit + storeOffset)) {
- // do what SEEK_NEXT_ROW does.
- if (!matcher.moreRowsMayExistAfter(cell)) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
- seekToNextRow(cell);
- break LOOP;
+ this.countPerRow++;
+ if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
+ // do what SEEK_NEXT_ROW does.
+ if (!matcher.moreRowsMayExistAfter(cell)) {
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
+ seekToNextRow(cell);
+ break LOOP;
+ }
- // add to results only if we have skipped #storeOffset kvs
- // also update metric accordingly
- if (this.countPerRow > storeOffset) {
- outResult.add(cell);
+ // add to results only if we have skipped #storeOffset kvs
+ // also update metric accordingly
+ if (this.countPerRow > storeOffset) {
+ outResult.add(cell);
- // Update local tracking information
- count++;
- totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
+ // Update local tracking information
+ count++;
+ totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
- // Update the progress of the scanner context
- scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
- scannerContext.incrementBatchProgress(1);
+ // Update the progress of the scanner context
+ scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
+ scannerContext.incrementBatchProgress(1);
- if (totalBytesRead > maxRowSize) {
- throw new RowTooBigException("Max row size allowed: " + maxRowSize
- + ", but the row is bigger than that.");
- }
+ if (totalBytesRead > maxRowSize) {
+ throw new RowTooBigException(
+ "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
}
+ }
- if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
- if (!matcher.moreRowsMayExistAfter(cell)) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
- seekToNextRow(cell);
- } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
- seekAsDirection(matcher.getKeyForNextColumn(cell));
- } else {
- this.heap.next();
+ if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+ if (!matcher.moreRowsMayExistAfter(cell)) {
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
+ seekToNextRow(cell);
+ } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
+ seekAsDirection(matcher.getKeyForNextColumn(cell));
+ } else {
+ this.heap.next();
+ }
- if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
- break LOOP;
- }
- if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
- break LOOP;
- }
- continue;
+ if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
+ break LOOP;
+ }
+ if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
+ break LOOP;
+ }
+ continue;
- case DONE:
- return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+ case DONE:
+ return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
- case DONE_SCAN:
- close(false);// Do all cleanup except heap.close()
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ case DONE_SCAN:
+ close(false);// Do all cleanup except heap.close()
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- case SEEK_NEXT_ROW:
- // This is just a relatively simple end of scan fix, to short-cut end
- // us if there is an endKey in the scan.
- if (!matcher.moreRowsMayExistAfter(cell)) {
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- }
+ case SEEK_NEXT_ROW:
+ // This is just a relatively simple end of scan fix, to short-cut end
+ // us if there is an endKey in the scan.
+ if (!matcher.moreRowsMayExistAfter(cell)) {
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+ }
- seekToNextRow(cell);
- break;
+ seekToNextRow(cell);
+ break;
- case SEEK_NEXT_COL:
- seekAsDirection(matcher.getKeyForNextColumn(cell));
- break;
+ case SEEK_NEXT_COL:
+ seekAsDirection(matcher.getKeyForNextColumn(cell));
+ break;
- case SKIP:
- this.heap.next();
- break;
-
- case SEEK_NEXT_USING_HINT:
- Cell nextKV = matcher.getNextKeyHint(cell);
- if (nextKV != null) {
- seekAsDirection(nextKV);
- } else {
- heap.next();
- }
- break;
+ case SKIP:
+ this.heap.next();
+ break;
- default:
- throw new RuntimeException("UNEXPECTED");
+ case SEEK_NEXT_USING_HINT:
+ Cell nextKV = matcher.getNextKeyHint(cell);
+ if (nextKV != null) {
+ seekAsDirection(nextKV);
+ } else {
+ heap.next();
}
- } while((cell = this.heap.peek()) != null);
+ break;
- if (count > 0) {
- return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+ default:
+ throw new RuntimeException("UNEXPECTED");
}
+ } while ((cell = this.heap.peek()) != null);
- // No more keys
- close(false);// Do all cleanup except heap.close()
- return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- } finally {
- lock.unlock();
+ if (count > 0) {
+ return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
+
+ // No more keys
+ close(false);// Do all cleanup except heap.close()
+ return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
/*
@@ -684,30 +666,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Implementation of ChangedReadersObserver
@Override
public void updateReaders() throws IOException {
- lock.lock();
- try {
- if (this.closing) return;
+ flushed = true;
+ // Let the next() call handle re-creating and seeking
+ }
+ protected void nullifyCurrentHeap() {
+ if (this.closing) return;
// All public synchronized API calls will call 'checkReseek' which will cause
// the scanner stack to reseek if this.heap==null && this.lastTop != null.
// But if two calls to updateReaders() happen without a 'next' or 'peek' then we
// will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
// which is NOT what we want, not to mention could cause an NPE. So we early out here.
if (this.heap == null) return;
-
// this could be null.
- this.lastTop = this.peek();
+ this.lastTop = this.heap.peek();
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files
this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
-
- // Let the next() call handle re-creating and seeking
- } finally {
- lock.unlock();
- }
}
/**
@@ -793,18 +771,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
- lock.lock();
- try {
- //Heap will not be null, if this is called from next() which.
- //If called from RegionScanner.reseek(...) make sure the scanner
- //stack is reset if needed.
+ checkResetHeap();
+ // Heap will not be null, if this is called from next() which.
+ // If called from RegionScanner.reseek(...) make sure the scanner
+ // stack is reset if needed.
checkReseek();
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
- } finally {
- lock.unlock();
+ }
+
+ protected void checkResetHeap() {
+ // check the var without any lock. Suppose even if we see the old
+ // value here still it is ok to continue because we will not be resetting
+ // the heap but will continue with the referenced memstore's snapshot. For compactions
+ // any way we don't need the updateReaders at all to happen as we still continue with
+ // the older files
+ if (flushed) {
+ // If the 'flushed' is found to be true then there is a need to ensure
+ // that the current scanner updates the heap that it has and then proceed
+ // with the scan and ensure to reset the flushed inside the lock
+ // One thing can be sure that the same store scanner cannot be in reseek and
+ // next at the same time ie. within the same store scanner it is always single
+ // threaded
+ nullifyCurrentHeap();
+ // reset the flag
+ flushed = false;
}
}
@@ -883,17 +876,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public void shipped() throws IOException {
- lock.lock();
- try {
- for (KeyValueHeap h : this.heapsForDelayedClose) {
- h.close();// There wont be further fetch of Cells from these scanners. Just close.
- }
- this.heapsForDelayedClose.clear();
- if (this.heap != null) {
- this.heap.shipped();
- }
- } finally {
- lock.unlock();
+ for (KeyValueHeap h : this.heapsForDelayedClose) {
+ h.close();// There wont be further fetch of Cells from these scanners. Just close.
+ }
+ this.heapsForDelayedClose.clear();
+ if (this.heap != null) {
+ this.heap.shipped();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index bb49aba..ef2c282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -106,6 +106,7 @@ public class StripeStoreFileManager
/** Cached list of all files in the structure, to return from some calls */
public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
+ private ImmutableList<StoreFile> allCompactedFilesCached = ImmutableList.<StoreFile>of();
}
private State state = null;
@@ -141,8 +142,14 @@ public class StripeStoreFileManager
}
@Override
+ public Collection<StoreFile> getCompactedfiles() {
+ return state.allCompactedFilesCached;
+ }
+
+ @Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
+ // Passing null does not cause NPE??
cmc.mergeResults(null, sfs);
debugDumpState("Added new files");
}
@@ -157,6 +164,13 @@ public class StripeStoreFileManager
}
@Override
+ public ImmutableCollection<StoreFile> clearCompactedFiles() {
+ ImmutableCollection<StoreFile> result = state.allCompactedFilesCached;
+ this.state = new State();
+ return result;
+ }
+
+ @Override
public int getStorefileCount() {
return state.allFilesCached.size();
}
@@ -306,9 +320,31 @@ public class StripeStoreFileManager
// copies and apply the result at the end.
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.mergeResults(compactedFiles, results);
+ markCompactedAway(compactedFiles);
debugDumpState("Merged compaction results");
}
+ // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+ // Let a background thread close the actual reader on these compacted files and also
+ // ensure to evict the blocks from block cache so that they are no longer in
+ // cache
+ private void markCompactedAway(Collection<StoreFile> compactedFiles) {
+ for (StoreFile file : compactedFiles) {
+ file.markCompactedAway();
+ }
+ }
+
+ @Override
+ public void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException {
+ // See class comment for the assumptions we make here.
+ LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
+ // In order to be able to fail in the middle of the operation, we'll operate on lazy
+ // copies and apply the result at the end.
+ CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
+ cmc.deleteResults(compactedFiles);
+ debugDumpState("Deleted compaction results");
+ }
+
@Override
public int getStoreCompactionPriority() {
// If there's only L0, do what the default store does.
@@ -684,7 +720,7 @@ public class StripeStoreFileManager
this.isFlush = isFlush;
}
- public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
+ private void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
throws IOException {
assert this.compactedFiles == null && this.results == null;
this.compactedFiles = compactedFiles;
@@ -696,12 +732,20 @@ public class StripeStoreFileManager
processNewCandidateStripes(newStripes);
}
// Create new state and update parent.
- State state = createNewState();
+ State state = createNewState(false);
StripeStoreFileManager.this.state = state;
updateMetadataMaps();
}
- private State createNewState() {
+ private void deleteResults(Collection<StoreFile> compactedFiles) throws IOException {
+ this.compactedFiles = compactedFiles;
+ // Create new state and update parent.
+ State state = createNewState(true);
+ StripeStoreFileManager.this.state = state;
+ updateMetadataMaps();
+ }
+
+ private State createNewState(boolean delCompactedFiles) {
State oldState = StripeStoreFileManager.this.state;
// Stripe count should be the same unless the end rows changed.
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
@@ -717,9 +761,21 @@ public class StripeStoreFileManager
}
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
- if (!isFlush) newAllFiles.removeAll(compactedFiles);
- newAllFiles.addAll(results);
+ List<StoreFile> newAllCompactedFiles =
+ new ArrayList<StoreFile>(oldState.allCompactedFilesCached);
+ if (!isFlush) {
+ newAllFiles.removeAll(compactedFiles);
+ if (delCompactedFiles) {
+ newAllCompactedFiles.removeAll(compactedFiles);
+ } else {
+ newAllCompactedFiles.addAll(compactedFiles);
+ }
+ }
+ if (results != null) {
+ newAllFiles.addAll(results);
+ }
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
+ newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
return newState;
}
@@ -970,14 +1026,16 @@ public class StripeStoreFileManager
// Order by seqnum is reversed.
for (int i = 1; i < stripe.size(); ++i) {
StoreFile sf = stripe.get(i);
- long fileTs = sf.getReader().getMaxTimestamp();
- if (fileTs < maxTs && !filesCompacting.contains(sf)) {
- LOG.info("Found an expired store file: " + sf.getPath()
- + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
- if (expiredStoreFiles == null) {
- expiredStoreFiles = new ArrayList<StoreFile>();
+ synchronized (sf) {
+ long fileTs = sf.getReader().getMaxTimestamp();
+ if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+ LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is "
+ + fileTs + ", which is below " + maxTs);
+ if (expiredStoreFiles == null) {
+ expiredStoreFiles = new ArrayList<StoreFile>();
+ }
+ expiredStoreFiles.add(sf);
}
- expiredStoreFiles.add(sf);
}
}
return expiredStoreFiles;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
new file mode 100644
index 0000000..4cf120d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * A chore service that periodically cleans up the compacted files when there are no active readers
+ * using those compacted files and also helps in clearing the block cache with these compacted
+ * file entries
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischarger extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
+ private Region region;
+
+ /**
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param region the store to identify the family name
+ */
+ public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
+ // Need to add the config classes
+ super("CompactedHFilesCleaner", stopper, period);
+ this.region = region;
+ }
+
+ @Override
+ public void chore() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
+ }
+ for (Store store : region.getStores()) {
+ try {
+ store.closeAndArchiveCompactedFiles();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed archiving the compacted files for the region "
+ + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Exception while trying to close and archive the comapcted store files of the store "
+ + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
+ e);
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 304638a..94a63d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -198,17 +198,6 @@ public class TestIOFencing {
}
@Override
- protected void completeCompaction(final Collection<StoreFile> compactedFiles,
- boolean removeFiles) throws IOException {
- try {
- r.compactionsWaiting.countDown();
- r.compactionsBlocked.await();
- } catch (InterruptedException ex) {
- throw new IOException(ex);
- }
- super.completeCompaction(compactedFiles, removeFiles);
- }
- @Override
protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
try {
r.compactionsWaiting.countDown();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index a28112d..55e43de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -170,10 +172,11 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
- Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
-
+ HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ final CompactedHFilesDischarger compactionCleaner =
+ new CompactedHFilesDischarger(100, stop, region);
loadFlushAndCompact(region, TEST_FAM);
-
+ compactionCleaner.chore();
// get the current hfiles in the archive directory
List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) {
@@ -217,18 +220,22 @@ public class TestZooKeeperTableArchiveClient {
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
-
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
- Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ final CompactedHFilesDischarger compactionCleaner =
+ new CompactedHFilesDischarger(100, stop, region);
loadFlushAndCompact(region, TEST_FAM);
-
+ compactionCleaner.chore();
// create the another table that we don't archive
hcd = new HColumnDescriptor(TEST_FAM);
- Region otherRegion = UTIL.createTestRegion(otherTable, hcd);
+ HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
+ final CompactedHFilesDischarger compactionCleaner1 =
+ new CompactedHFilesDischarger(100, stop, otherRegion);
loadFlushAndCompact(otherRegion, TEST_FAM);
-
+ compactionCleaner1.chore();
// get the current hfiles in the archive directory
+ // Should be archived
List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) {
FSUtils.logFileSystemState(fs, archiveDir, LOG);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index f6ade32..4f30960 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -76,7 +76,7 @@ public class TestHeapSize {
LOG.info("name=" + b.getName());
LOG.info("specname=" + b.getSpecName());
LOG.info("specvendor=" + b.getSpecVendor());
- LOG.info("vmname=" + b.getVmName());
+ LOG.info("vmname=" + b.getVmName());
LOG.info("vmversion=" + b.getVmVersion());
LOG.info("vmvendor=" + b.getVmVendor());
Map<String, String> p = b.getSystemProperties();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 9ff88f0..337eeac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -95,6 +97,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
/**
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.
@@ -1002,6 +1006,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
+ List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
@@ -1015,6 +1025,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
+ List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 20b0642..60c5473 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneReq
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -121,6 +124,7 @@ public class TestSnapshotFromMaster {
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName());
+ conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
}
@@ -320,6 +324,10 @@ public class TestSnapshotFromMaster {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 2 so will compact and archive
}
+ for (HRegion region : regions) {
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+ cleaner.chore();
+ }
LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index f99226f..3614846 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -79,6 +79,11 @@ public class MockStoreFile extends StoreFile {
}
@Override
+ public boolean isCompactedAway() {
+ return false;
+ }
+
+ @Override
public byte[] getMetadataValue(byte[] key) {
return this.metadata.get(key);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index 6c66c6d..82be1db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
+import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import javax.crypto.spec.SecretKeySpec;
@@ -123,13 +125,14 @@ public class TestEncryptionKeyRotation {
// And major compact
TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
+ final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName());
TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// When compaction has finished, all of the original files will be
// gone
boolean found = false;
- for (Path path: initialPaths) {
+ for (Path path: updatePaths) {
found = TEST_UTIL.getTestFileSystem().exists(path);
if (found) {
LOG.info("Found " + path);
@@ -141,14 +144,20 @@ public class TestEncryptionKeyRotation {
});
// Verify we have store file(s) with only the new key
+ Thread.sleep(1000);
+ waitForCompaction(htd.getTableName());
List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
assertTrue(pathsAfterCompaction.size() > 0);
for (Path path: pathsAfterCompaction) {
- assertFalse("Store file " + path + " retains initial key",
- Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
assertTrue("Store file " + path + " has incorrect key",
Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
}
+ List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName());
+ assertTrue(compactedPaths.size() > 0);
+ for (Path path: compactedPaths) {
+ assertTrue("Store file " + path + " retains initial key",
+ Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
+ }
}
@Test
@@ -194,6 +203,33 @@ public class TestEncryptionKeyRotation {
}
}
+ private static void waitForCompaction(TableName tableName)
+ throws IOException, InterruptedException {
+ boolean compacted = false;
+ for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
+ .getOnlineRegions(tableName)) {
+ for (Store store : region.getStores()) {
+ compacted = false;
+ while (!compacted) {
+ if (store.getStorefiles() != null) {
+ while (store.getStorefilesCount() != 1) {
+ Thread.sleep(100);
+ }
+ for (StoreFile storefile : store.getStorefiles()) {
+ if (!storefile.isCompactedAway()) {
+ compacted = true;
+ break;
+ }
+ Thread.sleep(100);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ }
+
private static List<Path> findStorefilePaths(TableName tableName) throws Exception {
List<Path> paths = new ArrayList<Path>();
for (Region region:
@@ -207,6 +243,23 @@ public class TestEncryptionKeyRotation {
return paths;
}
+ private static List<Path> findCompactedStorefilePaths(TableName tableName) throws Exception {
+ List<Path> paths = new ArrayList<Path>();
+ for (Region region:
+ TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
+ for (Store store : region.getStores()) {
+ Collection<StoreFile> compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ if (compactedfiles != null) {
+ for (StoreFile storefile : compactedfiles) {
+ paths.add(storefile.getPath());
+ }
+ }
+ }
+ }
+ return paths;
+ }
+
private void createTableAndFlush(HTableDescriptor htd) throws Exception {
HColumnDescriptor hcd = htd.getFamilies().iterator().next();
// Create the test table
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 0c2e01c..c59d6f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -1369,6 +1370,8 @@ public class TestHRegionReplayEvents {
// Test case 3: compact primary files
primaryRegion.compactStores();
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
+ cleaner.chore();
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 1c99fe3..e0c1453 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -212,7 +214,7 @@ public class TestRegionMergeTransactionOnCluster {
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
.getTableRegionsAndLocations(master.getConnection(), tableName);
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
- HTableDescriptor tableDescritor = master.getTableDescriptors().get(
+ HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
tableName);
Result mergedRegionResult = MetaTableAccessor.getRegionResult(
master.getConnection(), mergedRegionInfo.getRegionName());
@@ -236,19 +238,34 @@ public class TestRegionMergeTransactionOnCluster {
assertTrue(fs.exists(regionAdir));
assertTrue(fs.exists(regionBdir));
+ HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
+ HRegionFileSystem hrfs = new HRegionFileSystem(
+ TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+ int count = 0;
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ count += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
admin.compactRegion(mergedRegionInfo.getRegionName());
// wait until merged region doesn't have reference file
long timeout = System.currentTimeMillis() + waitTime;
- HRegionFileSystem hrfs = new HRegionFileSystem(
- TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
while (System.currentTimeMillis() < timeout) {
- if (!hrfs.hasReferences(tableDescritor)) {
+ if (!hrfs.hasReferences(tableDescriptor)) {
break;
}
Thread.sleep(50);
}
- assertFalse(hrfs.hasReferences(tableDescritor));
-
+ int newcount = 0;
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ assertTrue(newcount > count);
+ // clean up the merged region store files
+ List<HRegion> regions =
+ TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
+ for (HRegion region : regions) {
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+ cleaner.chore();
+ }
// run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions
int cleaned = admin.runCatalogScan();