You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/24 00:49:33 UTC
[13/50] [abbrv] hbase git commit: HBASE-14636 Clear
HFileScannerImpl#prevBlocks in between Compaction flow.
HBASE-14636 Clear HFileScannerImpl#prevBlocks in between Compaction flow.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9523a56
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9523a56
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9523a56
Branch: refs/heads/hbase-12439
Commit: c9523a569d45e9edc2c2d7b8d4d9cbf05f46a100
Parents: 51693b9
Author: anoopsjohn <an...@gmail.com>
Authored: Tue Oct 20 13:06:09 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Tue Oct 20 13:06:09 2015 +0530
----------------------------------------------------------------------
.../hadoop/hbase/io/hfile/HFileBlock.java | 7 ++++
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 6 ++-
.../regionserver/compactions/Compactor.java | 43 ++++++++++++++------
3 files changed, 42 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 4fd32a4..a68d0a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1938,6 +1938,13 @@ public class HFileBlock implements Cacheable {
}
/**
+ * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
+ */
+ public boolean usesSharedMemory() {
+ return this.memType == MemoryType.SHARED;
+ }
+
+ /**
* Convert the contents of the block header into a human readable string.
* This is mostly helpful for debugging. This assumes that the block
* has minor version > 0.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 6970d27..5af72b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -510,14 +510,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
block.getOffset() == this.curBlock.getOffset()) {
return;
}
- if (this.curBlock != null) {
+ // We don't have to keep ref to EXCLUSIVE type of block
+ if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
}
void reset() {
- if (this.curBlock != null) {
+ // We don't have to keep ref to EXCLUSIVE type of block
+ if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
this.prevBlocks.add(this.curBlock);
}
this.curBlock = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 873d827..eaccd0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -58,13 +59,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@InterfaceAudience.Private
public abstract class Compactor {
private static final Log LOG = LogFactory.getLog(Compactor.class);
+ private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected CompactionProgress progress;
protected Configuration conf;
protected Store store;
protected int compactionKVMax;
protected Compression.Algorithm compactionCompression;
-
+
/** specify how many days to keep MVCC values during major compaction **/
protected int keepSeqIdPeriod;
@@ -272,12 +274,13 @@ public abstract class Compactor {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId,
CompactionThroughputController throughputController, boolean major) throws IOException {
- long bytesWritten = 0;
- long bytesWrittenProgress = 0;
+ long bytesWrittenProgressForCloseCheck = 0;
+ long bytesWrittenProgressForLog = 0;
+ long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<Cell>();
- long closeCheckInterval = HStore.getCloseCheckInterval();
+ long closeCheckSizeLimit = HStore.getCloseCheckInterval();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
@@ -289,6 +292,11 @@ public abstract class Compactor {
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
+ KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
+ int minFilesToCompact = Math.max(2,
+ conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
+ /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+ long shippedCallSizeLimit = minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
try {
do {
hasMore = scanner.next(cells, scannerContext);
@@ -304,35 +312,46 @@ public abstract class Compactor {
int len = KeyValueUtil.length(c);
++progress.currentCompactedKVs;
progress.totalCompactedSize += len;
+ bytesWrittenProgressForShippedCall += len;
if (LOG.isDebugEnabled()) {
- bytesWrittenProgress += len;
+ bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
- if (closeCheckInterval > 0) {
- bytesWritten += len;
- if (bytesWritten > closeCheckInterval) {
- bytesWritten = 0;
+ if (closeCheckSizeLimit > 0) {
+ bytesWrittenProgressForCloseCheck += len;
+ if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
+ bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
}
+ if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
+ // The SHARED block references, being read for compaction, will be kept in prevBlocks
+ // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
+ // being returned to client, we will call shipped() which can clear this list. Here by
+ // we are doing the similar thing. In between the compaction (after every N cells
+ // written with collective size of 'shippedCallSizeLimit') we will call shipped which
+ // may clear prevBlocks list.
+ kvs.shipped();
+ bytesWrittenProgressForShippedCall = 0;
+ }
}
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
- if ((now - lastMillis) >= 60 * 1000) {
+ if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
- + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
+ + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
lastMillis = now;
- bytesWrittenProgress = 0;
+ bytesWrittenProgressForLog = 0;
}
}
cells.clear();