You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/03/18 23:34:00 UTC

svn commit: r1458049 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver: StoreFile.java compactions/Compactor.java compactions/DefaultCompactor.java

Author: sershe
Date: Mon Mar 18 22:33:59 2013
New Revision: 1458049

URL: http://svn.apache.org/r1458049
Log:
HBASE-8080 refactor default compactor to make its parts easier to reuse

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1458049&r1=1458048&r2=1458049&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Mon Mar 18 22:33:59 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -707,7 +708,7 @@ public class StoreFile {
    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
    * local because it is an implementation detail of the HBase regionserver.
    */
-  public static class Writer {
+  public static class Writer implements Compactor.CellSink {
     private final BloomFilterWriter generalBloomFilterWriter;
     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
     private final BloomType bloomType;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1458049&r1=1458048&r2=1458049&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Mon Mar 18 22:33:59 2013
@@ -18,25 +18,59 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A compactor is a compaction algorithm associated a given policy.
+ * A compactor is a compaction algorithm associated a given policy. Base class also contains
+ * reusable parts for implementing compactors (what is common and what isn't is evolving).
  */
 @InterfaceAudience.Private
 public abstract class Compactor {
-
+  private static final Log LOG = LogFactory.getLog(Compactor.class);
   protected CompactionProgress progress;
   protected Configuration conf;
+  protected Store store;
+
+  private int compactionKVMax;
+  protected Compression.Algorithm compactionCompression;
 
-  Compactor(final Configuration conf) {
+  //TODO: depending on Store is not good but, realistically, all compactors currently do.
+  Compactor(final Configuration conf, final Store store) {
     this.conf = conf;
+    this.store = store;
+    this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
+    this.compactionCompression = (this.store.getFamily() == null) ?
+        Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
+  }
+
+  /**
+   * TODO: Replace this with {@link CellOutputStream} when StoreFile.Writer uses cells.
+   */
+  public interface CellSink {
+    void append(KeyValue kv) throws IOException;
   }
 
   /**
@@ -68,4 +102,137 @@ public abstract class Compactor {
   public CompactionProgress getProgress() {
     return this.progress;
   }
+
+  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
+  protected static class FileDetails {
+    /** Maximum key count after compaction (for blooms) */
+    public int maxKeyCount = 0;
+    /** Earliest put timestamp if major compaction */
+    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    /** The last key in the files we're compacting. */
+    public long maxSeqId = 0;
+  }
+
+  protected FileDetails getFileDetails(
+      Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
+    FileDetails fd = new FileDetails();
+
+    for (StoreFile file : filesToCompact) {
+      long seqNum = file.getMaxSequenceId();
+      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
+      StoreFile.Reader r = file.getReader();
+      if (r == null) {
+        LOG.warn("Null reader for " + file.getPath());
+        continue;
+      }
+      // NOTE: getFilterEntries could cause under-sized blooms if the user
+      // switches bloom type (e.g. from ROW to ROWCOL)
+      long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
+          ? r.getFilterEntries() : r.getEntries();
+      fd.maxKeyCount += keyCount;
+      // If required, calculate the earliest put timestamp of all involved storefiles.
+      // This is used to remove family delete marker during compaction.
+      long earliestPutTs = 0;
+      if (calculatePutTs) {
+        byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // There's a file with no information, must be an old one
+          // assume we have very old puts
+          fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Bytes.toLong(tmp);
+          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Compacting " + file +
+          ", keycount=" + keyCount +
+          ", bloomtype=" + r.getBloomFilterType().toString() +
+          ", size=" + StringUtils.humanReadableInt(r.length()) +
+          ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+          ", seqNum=" + seqNum +
+          (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
+      }
+    }
+    return fd;
+  }
+
+  protected List<StoreFileScanner> createFileScanners(
+      final Collection<StoreFile> filesToCompact) throws IOException {
+    return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true);
+  }
+
+  protected long setSmallestReadPoint() {
+    long smallestReadPoint = store.getSmallestReadPoint();
+    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    return smallestReadPoint;
+  }
+
+  protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
+      ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
+    if (store.getCoprocessorHost() == null) return null;
+    return store.getCoprocessorHost()
+        .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
+  }
+
+  protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
+      ScanType scanType, InternalScanner scanner) throws IOException {
+    if (store.getCoprocessorHost() == null) return scanner;
+    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
+  }
+
+  @SuppressWarnings("deprecation")
+  protected boolean performCompaction(InternalScanner scanner,
+      CellSink writer, long smallestReadPoint) throws IOException {
+    int bytesWritten = 0;
+    // Since scanner.next() can return 'false' but still be delivering data,
+    // we have to use a do/while loop.
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+    int closeCheckInterval = HStore.getCloseCheckInterval();
+    boolean hasMore;
+    do {
+      hasMore = scanner.next(kvs, compactionKVMax);
+      // output to writer:
+      for (KeyValue kv : kvs) {
+        if (kv.getMemstoreTS() <= smallestReadPoint) {
+          kv.setMemstoreTS(0);
+        }
+        writer.append(kv);
+        // update progress per key
+        ++progress.currentCompactedKVs;
+
+        // check periodically to see if a system stop is requested
+        if (closeCheckInterval > 0) {
+          bytesWritten += kv.getLength();
+          if (bytesWritten > closeCheckInterval) {
+            bytesWritten = 0;
+            if (!store.areWritesEnabled()) return false;
+          }
+        }
+      }
+      kvs.clear();
+    } while (hasMore);
+    return true;
+  }
+
+  protected void abortWriter(final StoreFile.Writer writer) throws IOException {
+    writer.close();
+    store.getFileSystem().delete(writer.getPath(), false);
+  }
+
+  /**
+   * @param scanners Store file scanners.
+   * @param scanType Scan type.
+   * @param smallestReadPoint Smallest MVCC read point.
+   * @param earliestPutTs Earliest put across all files.
+   * @return A compaction scanner.
+   */
+  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+    Scan scan = new Scan();
+    scan.setMaxVersions(store.getFamily().getMaxVersions());
+    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+        scanType, smallestReadPoint, earliestPutTs);
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1458049&r1=1458048&r2=1458049&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Mon Mar 18 22:33:59 2013
@@ -49,170 +49,62 @@ import org.apache.hadoop.util.StringUtil
 @InterfaceAudience.Private
 public class DefaultCompactor extends Compactor {
   private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
-  private final Store store;
 
   public DefaultCompactor(final Configuration conf, final Store store) {
-    super(conf);
-    this.store = store;
+    super(conf, store);
   }
 
   /**
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
    */
-  @SuppressWarnings("deprecation")
-  @Override
   public List<Path> compact(final CompactionRequest request) throws IOException {
-    final Collection<StoreFile> filesToCompact = request.getFiles();
-    boolean majorCompaction = request.isMajor();
-    // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
+    FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
+    this.progress = new CompactionProgress(fd.maxKeyCount);
 
-    // Calculate maximum key count after compaction (for blooms)
-    // Also calculate earliest put timestamp if major compaction
-    int maxKeyCount = 0;
-    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
-    ScanType scanType = request.isMajor()
-        ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
-    for (StoreFile file: filesToCompact) {
-      StoreFile.Reader r = file.getReader();
-      if (r == null) {
-        LOG.warn("Null reader for " + file.getPath());
-        continue;
-      }
-      // NOTE: getFilterEntries could cause under-sized blooms if the user
-      // switches bloom type (e.g. from ROW to ROWCOL)
-      long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
-        r.getFilterEntries() : r.getEntries();
-      maxKeyCount += keyCount;
-      // For major compactions calculate the earliest put timestamp of all
-      // involved storefiles. This is used to remove family delete marker during
-      // compaction.
-      if (scanType == ScanType.COMPACT_DROP_DELETES) {
-        byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
-        if (tmp == null) {
-          // There's a file with no information, must be an old one
-          // assume we have very old puts
-          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
-        } else {
-          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Compacting " + file +
-          ", keycount=" + keyCount +
-          ", bloomtype=" + r.getBloomFilterType().toString() +
-          ", size=" + StringUtils.humanReadableInt(r.length()) +
-          ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
-          (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
-      }
-    }
-
-    // keep track of compaction progress
-    this.progress = new CompactionProgress(maxKeyCount);
-
-    // For each file, obtain a scanner:
-    List<StoreFileScanner> scanners = StoreFileScanner
-      .getScannersForStoreFiles(filesToCompact, false, false, true);
+    List<StoreFileScanner> scanners = createFileScanners(request.getFiles());
 
-    // Get some configs
-    int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
-    Compression.Algorithm compression = store.getFamily().getCompression();
-    // Avoid overriding compression setting for major compactions if the user
-    // has not specified it separately
-    Compression.Algorithm compactionCompression =
-      (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
-      store.getFamily().getCompactionCompression(): compression;
-    // Make the instantiation lazy in case compaction produces no product; i.e.
-    // where all source cells are expired or deleted.
     StoreFile.Writer writer = null;
     List<Path> newFiles = new ArrayList<Path>();
     // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = store.getSmallestReadPoint();
-    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    long smallestReadPoint = setSmallestReadPoint();
     try {
       InternalScanner scanner = null;
       try {
-        if (store.getCoprocessorHost() != null) {
-          scanner = store.getCoprocessorHost()
-              .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
-        }
+        /* Include deletes, unless we are doing a major compaction */
+        ScanType scanType =
+            request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
+        scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
         if (scanner == null) {
-          Scan scan = new Scan();
-          scan.setMaxVersions(store.getFamily().getMaxVersions());
-          /* Include deletes, unless we are doing a major compaction */
-          scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
-            scanType, smallestReadPoint, earliestPutTs);
-        }
-        if (store.getCoprocessorHost() != null) {
-          InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
-            scanType, request);
-          // NULL scanner returned from coprocessor hooks means skip normal processing
-          if (cpScanner == null) {
-            return newFiles; // an empty list
-          }
-          scanner = cpScanner;
+          scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
         }
-
-        int bytesWritten = 0;
-        // Since scanner.next() can return 'false' but still be delivering data,
-        // we have to use a do/while loop.
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
-        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
-        int closeCheckInterval = HStore.getCloseCheckInterval();
-        boolean hasMore;
-        do {
-          hasMore = scanner.next(kvs, compactionKVMax);
-          // Create the writer even if no kv(Empty store file is also ok),
-          // because we need record the max seq id for the store file, see
-          // HBASE-6059
-          if (writer == null) {
-            writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
-          }
-          if (writer != null) {
-            // output to writer:
-            for (KeyValue kv : kvs) {
-              if (kv.getMemstoreTS() <= smallestReadPoint) {
-                kv.setMemstoreTS(0);
-              }
-              writer.append(kv);
-              // update progress per key
-              ++progress.currentCompactedKVs;
-
-              // check periodically to see if a system stop is requested
-              if (closeCheckInterval > 0) {
-                bytesWritten += kv.getLength();
-                if (bytesWritten > closeCheckInterval) {
-                  bytesWritten = 0;
-                  isInterrupted(store, writer);
-                }
-              }
-            }
-          }
-          kvs.clear();
-        } while (hasMore);
-      } finally {
-        if (scanner != null) {
-          scanner.close();
+        scanner = postCreateCoprocScanner(request, scanType, scanner);
+        if (scanner == null) {
+          // NULL scanner returned from coprocessor hooks means skip normal processing.
+          return newFiles;
         }
+        // Create the writer even if no kv(Empty store file is also ok),
+        // because we need record the max seq id for the store file, see HBASE-6059
+        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true);
+        boolean finished = performCompaction(scanner, writer, smallestReadPoint);
+        if (!finished) {
+          abortWriter(writer);
+          writer = null;
+          throw new InterruptedIOException( "Aborting compaction of store " + store +
+              " in region " + store.getRegionInfo().getRegionNameAsString() +
+              " because it was interrupted.");
+         }
+       } finally {
+         if (scanner != null) {
+           scanner.close();
+         }
       }
     } finally {
       if (writer != null) {
-        writer.appendMetadata(maxId, majorCompaction);
+        writer.appendMetadata(fd.maxSeqId, request.isMajor());
         writer.close();
         newFiles.add(writer.getPath());
       }
     }
     return newFiles;
   }
-
-  void isInterrupted(final Store store, final StoreFile.Writer writer)
-  throws IOException {
-    if (store.areWritesEnabled()) return;
-    // Else cleanup.
-    writer.close();
-    store.getFileSystem().delete(writer.getPath(), false);
-    throw new InterruptedIOException( "Aborting compaction of store " + store +
-      " in region " + store.getRegionInfo().getRegionNameAsString() +
-      " because it was interrupted.");
-  }
 }