You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/12 02:19:27 UTC

svn commit: r1230347 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: HConstants.java regionserver/HRegion.java regionserver/Store.java util/Threads.java

Author: tedyu
Date: Thu Jan 12 01:19:26 2012
New Revision: 1230347

URL: http://svn.apache.org/viewvc?rev=1230347&view=rev
Log:
HBASE-5033 Differential Revision: 933 Opening/Closing store in parallel to reduce region open/close time (Liyin)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jan 12 01:19:26 2012
@@ -19,15 +19,15 @@
  */
 package org.apache.hadoop.hbase;
 
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
 /**
  * HConstants holds a bunch of HBase-related constants
  */
@@ -218,6 +218,19 @@ public final class HConstants {
   public static final String HREGION_MAX_FILESIZE =
       "hbase.hregion.max.filesize";
 
+  /**
+   * The max number of threads used for opening and closing stores or store
+   * files in parallel
+   */
+  public static final String HSTORE_OPEN_AND_CLOSE_THREADS_MAX =
+    "hbase.hstore.open.and.close.threads.max";
+
+  /**
+   * The default number for the max number of threads used for opening and
+   * closing stores or store files in parallel
+   */
+  public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1;
+
   /** Default maximum file size */
   public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jan 12 01:19:26 2012
@@ -40,10 +40,17 @@ import java.util.NavigableSet;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,6 +113,7 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.Writable;
@@ -114,6 +122,7 @@ import org.cliffc.high_scale_lib.Counter
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.MutableClassToInstanceMap;
@@ -433,7 +442,7 @@ public class HRegion implements HeapSize
    * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)
    */
   public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
-      HRegionInfo regionInfo, final HTableDescriptor htd,
+    final HRegionInfo regionInfo, final HTableDescriptor htd,
       RegionServerServices rsServices) {
     this.tableDir = tableDir;
     this.comparator = regionInfo.getComparator();
@@ -542,20 +551,49 @@ public class HRegion implements HeapSize
     long maxSeqId = -1;
     // initialized to -1 so that we pick up MemstoreTS from column families
     long maxMemstoreTS = -1;
-    for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) {
-      status.setStatus("Instantiating store for column family " + c);
-      Store store = instantiateHStore(this.tableDir, c);
-      this.stores.put(c.getName(), store);
-      long storeSeqId = store.getMaxSequenceId();
-      if (minSeqId == -1 || storeSeqId < minSeqId) {
-        minSeqId = storeSeqId;
-      }
-      if (maxSeqId == -1 || storeSeqId > maxSeqId) {
-        maxSeqId = storeSeqId;
-      }
-      long maxStoreMemstoreTS = store.getMaxMemstoreTS();
-      if (maxStoreMemstoreTS > maxMemstoreTS) {
-        maxMemstoreTS = maxStoreMemstoreTS;
+
+    if (this.htableDescriptor != null &&
+        !htableDescriptor.getFamilies().isEmpty()) {
+      // initialize the thread pool for opening stores in parallel.
+      ThreadPoolExecutor storeOpenerThreadPool =
+        getStoreOpenAndCloseThreadPool(
+          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
+      CompletionService<Store> completionService =
+        new ExecutorCompletionService<Store>(storeOpenerThreadPool);
+
+      // initialize each store in parallel
+      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
+        status.setStatus("Instantiating store for column family " + family);
+        completionService.submit(new Callable<Store>() {
+          public Store call() throws IOException {
+            return instantiateHStore(tableDir, family);
+          }
+        });
+      }
+      try {
+        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
+          Future<Store> future = completionService.take();
+          Store store = future.get();
+
+          this.stores.put(store.getColumnFamilyName().getBytes(), store);
+          long storeSeqId = store.getMaxSequenceId();
+          if (minSeqId == -1 || storeSeqId < minSeqId) {
+            minSeqId = storeSeqId;
+          }
+          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
+            maxSeqId = storeSeqId;
+          }
+          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
+          if (maxStoreMemstoreTS > maxMemstoreTS) {
+            maxMemstoreTS = maxStoreMemstoreTS;
+          }
+        }
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      } finally {
+        storeOpenerThreadPool.shutdownNow();
       }
     }
     mvcc.initialize(maxMemstoreTS + 1);
@@ -883,8 +921,38 @@ public class HRegion implements HeapSize
       }
 
       List<StoreFile> result = new ArrayList<StoreFile>();
-      for (Store store : stores.values()) {
-        result.addAll(store.close());
+      if (!stores.isEmpty()) {
+        // initialize the thread pool for closing stores in parallel.
+        ThreadPoolExecutor storeCloserThreadPool =
+          getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+            + this.regionInfo.getRegionNameAsString());
+        CompletionService<ImmutableList<StoreFile>> completionService =
+          new ExecutorCompletionService<ImmutableList<StoreFile>>(
+            storeCloserThreadPool);
+      
+        // close each store in parallel
+        for (final Store store : stores.values()) {
+          completionService
+              .submit(new Callable<ImmutableList<StoreFile>>() {
+                public ImmutableList<StoreFile> call() throws IOException {
+                  return store.close();
+                }
+              });
+        }
+        try {
+          for (int i = 0; i < stores.size(); i++) {
+            Future<ImmutableList<StoreFile>> future = completionService
+                .take();
+            ImmutableList<StoreFile> storeFileList = future.get();
+            result.addAll(storeFileList);
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e.getCause());
+        } finally {
+          storeCloserThreadPool.shutdownNow();
+        }
       }
       this.closed.set(true);
 
@@ -900,6 +968,40 @@ public class HRegion implements HeapSize
     }
   }
 
+  protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
+      final String threadNamePrefix) {
+    int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
+    int maxThreads = Math.min(numStores,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
+      final String threadNamePrefix) {
+    int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
+    int maxThreads = Math.max(1,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
+            / numStores);
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+      final String threadNamePrefix) {
+    ThreadPoolExecutor openAndCloseThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                return t;
+              }
+            });
+    return openAndCloseThreadPool;
+  }
+
    /**
     * @return True if its worth doing a flush before we put up the close flag.
     */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Jan 12 01:19:26 2012
@@ -27,7 +27,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -271,38 +277,73 @@ public class Store extends SchemaConfigu
     return homedir;
   }
 
-  /*
-   * Creates an unsorted list of StoreFile loaded from the given directory.
+  /**
+   * Creates an unsorted list of StoreFile loaded in parallel
+   * from the given directory.
    * @throws IOException
    */
-  private List<StoreFile> loadStoreFiles()
-  throws IOException {
+  private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
     FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
-    for (int i = 0; files != null && i < files.length; i++) {
+
+    if (files == null || files.length == 0) {
+      return results;
+    }
+    // initialize the thread pool for opening store files in parallel..
+    ThreadPoolExecutor storeFileOpenerThreadPool =
+      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
+          this.family.getNameAsString());
+    CompletionService<StoreFile> completionService =
+      new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
+
+    int totalValidStoreFile = 0;
+    for (int i = 0; i < files.length; i++) {
       // Skip directories.
       if (files[i].isDir()) {
         continue;
       }
-      Path p = files[i].getPath();
-      // Check for empty file.  Should never be the case but can happen
+      final Path p = files[i].getPath();
+      // Check for empty file. Should never be the case but can happen
       // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
       if (this.fs.getFileStatus(p).getLen() <= 0) {
         LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
         continue;
       }
-      StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
-      passSchemaMetricsTo(curfile);
-      curfile.createReader();
-      long length = curfile.getReader().length();
-      this.storeSize += length;
-      this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loaded " + curfile.toStringDetailed());
+
+      // open each store file in parallel
+      completionService.submit(new Callable<StoreFile>() {
+        public StoreFile call() throws IOException {
+          StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
+              family.getBloomFilterType());
+          passSchemaMetricsTo(storeFile);
+          storeFile.createReader();
+          return storeFile;
+        }
+      });
+      totalValidStoreFile++;
+    }
+
+    try {
+      for (int i = 0; i < totalValidStoreFile; i++) {
+        Future<StoreFile> future = completionService.take();
+        StoreFile storeFile = future.get();
+        long length = storeFile.getReader().length();
+        this.storeSize += length;
+        this.totalUncompressedBytes +=
+          storeFile.getReader().getTotalUncompressedBytes();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("loaded " + storeFile.toStringDetailed());
+        }
+        results.add(storeFile);
       }
-      results.add(curfile);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      storeFileOpenerThreadPool.shutdownNow();
     }
+
     return results;
   }
 
@@ -499,8 +540,36 @@ public class Store extends SchemaConfigu
       // Clear so metrics doesn't find them.
       storefiles = ImmutableList.of();
 
-      for (StoreFile f: result) {
-        f.closeReader(true);
+      if (!result.isEmpty()) {
+        // initialize the thread pool for closing store files in parallel.
+        ThreadPoolExecutor storeFileCloserThreadPool = this.region
+            .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+                + this.family.getNameAsString());
+  
+        // close each store file in parallel
+        CompletionService<Void> completionService =
+          new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
+        for (final StoreFile f : result) {
+          completionService.submit(new Callable<Void>() {
+            public Void call() throws IOException {
+              f.closeReader(true);
+              return null;
+            }
+          });
+        }
+  
+        try {
+          for (int i = 0; i < result.size(); i++) {
+            Future<Void> future = completionService.take();
+            future.get();
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e.getCause());
+        } finally {
+          storeFileCloserThreadPool.shutdownNow();
+        }
       }
       LOG.debug("closed " + this.storeNameStr);
       return result;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java Thu Jan 12 01:19:26 2012
@@ -19,14 +19,17 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import java.io.PrintWriter;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
 /**
  * Thread Utility
  */
@@ -152,4 +155,25 @@ public class Threads {
     }
   }
 
+  /**
+   * Create a new CachedThreadPool with a bounded number as the maximum 
+   * thread size in the pool.
+   * 
+   * @param maxCachedThread the maximum thread could be created in the pool
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @param threadFactory the factory to use when creating new threads
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number 
+   * as the maximum thread size in the pool. 
+   */
+  public static ThreadPoolExecutor getBoundedCachedThreadPool(
+      int maxCachedThread, long timeout, TimeUnit unit,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor boundedCachedThreadPool =
+      new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+    // allow the core pool threads timeout and terminate
+    boundedCachedThreadPool.allowCoreThreadTimeOut(true);
+    return boundedCachedThreadPool;
+  }
 }