You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/24 00:49:33 UTC

[13/50] [abbrv] hbase git commit: HBASE-14636 Clear HFileScannerImpl#prevBlocks in between Compaction flow.

HBASE-14636 Clear HFileScannerImpl#prevBlocks in between Compaction flow.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9523a56
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9523a56
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9523a56

Branch: refs/heads/hbase-12439
Commit: c9523a569d45e9edc2c2d7b8d4d9cbf05f46a100
Parents: 51693b9
Author: anoopsjohn <an...@gmail.com>
Authored: Tue Oct 20 13:06:09 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Tue Oct 20 13:06:09 2015 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  7 ++++
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  6 ++-
 .../regionserver/compactions/Compactor.java     | 43 ++++++++++++++------
 3 files changed, 42 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 4fd32a4..a68d0a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1938,6 +1938,13 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
+   * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
+   */
+  public boolean usesSharedMemory() {
+    return this.memType == MemoryType.SHARED;
+  }
+
+  /**
    * Convert the contents of the block header into a human readable string.
    * This is mostly helpful for debugging. This assumes that the block
    * has minor version > 0.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 6970d27..5af72b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -510,14 +510,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           block.getOffset() == this.curBlock.getOffset()) {
         return;
       }
-      if (this.curBlock != null) {
+      // We don't have to keep ref to EXCLUSIVE type of block
+      if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
         prevBlocks.add(this.curBlock);
       }
       this.curBlock = block;
     }
 
     void reset() {
-      if (this.curBlock != null) {
+      // We don't have to keep ref to EXCLUSIVE type of block
+      if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
         this.prevBlocks.add(this.curBlock);
       }
       this.curBlock = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 873d827..eaccd0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -58,13 +59,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 @InterfaceAudience.Private
 public abstract class Compactor {
   private static final Log LOG = LogFactory.getLog(Compactor.class);
+  private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
   protected CompactionProgress progress;
   protected Configuration conf;
   protected Store store;
 
   protected int compactionKVMax;
   protected Compression.Algorithm compactionCompression;
-  
+
   /** specify how many days to keep MVCC values during major compaction **/ 
   protected int keepSeqIdPeriod;
 
@@ -272,12 +274,13 @@ public abstract class Compactor {
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
       long smallestReadPoint, boolean cleanSeqId,
       CompactionThroughputController throughputController, boolean major) throws IOException {
-    long bytesWritten = 0;
-    long bytesWrittenProgress = 0;
+    long bytesWrittenProgressForCloseCheck = 0;
+    long bytesWrittenProgressForLog = 0;
+    long bytesWrittenProgressForShippedCall = 0;
     // Since scanner.next() can return 'false' but still be delivering data,
     // we have to use a do/while loop.
     List<Cell> cells = new ArrayList<Cell>();
-    long closeCheckInterval = HStore.getCloseCheckInterval();
+    long closeCheckSizeLimit = HStore.getCloseCheckInterval();
     long lastMillis = 0;
     if (LOG.isDebugEnabled()) {
       lastMillis = EnvironmentEdgeManager.currentTime();
@@ -289,6 +292,11 @@ public abstract class Compactor {
         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
 
     throughputController.start(compactionName);
+    KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
+    int minFilesToCompact = Math.max(2,
+        conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
+            /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+    long shippedCallSizeLimit = minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
     try {
       do {
         hasMore = scanner.next(cells, scannerContext);
@@ -304,35 +312,46 @@ public abstract class Compactor {
           int len = KeyValueUtil.length(c);
           ++progress.currentCompactedKVs;
           progress.totalCompactedSize += len;
+          bytesWrittenProgressForShippedCall += len;
           if (LOG.isDebugEnabled()) {
-            bytesWrittenProgress += len;
+            bytesWrittenProgressForLog += len;
           }
           throughputController.control(compactionName, len);
           // check periodically to see if a system stop is requested
-          if (closeCheckInterval > 0) {
-            bytesWritten += len;
-            if (bytesWritten > closeCheckInterval) {
-              bytesWritten = 0;
+          if (closeCheckSizeLimit > 0) {
+            bytesWrittenProgressForCloseCheck += len;
+            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
+              bytesWrittenProgressForCloseCheck = 0;
               if (!store.areWritesEnabled()) {
                 progress.cancel();
                 return false;
               }
             }
           }
+          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
+            // The SHARED block references, being read for compaction, will be kept in prevBlocks
+            // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
+            // being returned to client, we will call shipped() which can clear this list. Here by
+            // we are doing the similar thing. In between the compaction (after every N cells
+            // written with collective size of 'shippedCallSizeLimit') we will call shipped which
+            // may clear prevBlocks list.
+            kvs.shipped();
+            bytesWrittenProgressForShippedCall = 0;
+          }
         }
         // Log the progress of long running compactions every minute if
         // logging at DEBUG level
         if (LOG.isDebugEnabled()) {
-          if ((now - lastMillis) >= 60 * 1000) {
+          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
             LOG.debug("Compaction progress: "
                 + compactionName
                 + " "
                 + progress
-                + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
+                + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
                 + throughputController);
             lastMillis = now;
-            bytesWrittenProgress = 0;
+            bytesWrittenProgressForLog = 0;
           }
         }
         cells.clear();