You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2012/01/18 04:07:06 UTC

svn commit: r1232729 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: HConstants.java master/HMaster.java regionserver/HRegion.java regionserver/Store.java util/Threads.java

Author: nspiegelberg
Date: Wed Jan 18 03:07:06 2012
New Revision: 1232729

URL: http://svn.apache.org/viewvc?rev=1232729&view=rev
Log:
[jira][HBASE-5033] Opening/Closing store in parallel to reduce region open/close
time

Summary:
The public revision: https://reviews.facebook.net/D933

Region servers are opening/closing each store and each store file for every
store in sequential fashion, which may cause inefficiency to open/close regions.

So this diff is to open/close each store in parallel in order to reduce region
open/close time. Also it would help to reduce the cluster restart time.

1) Opening each store in parallel
2) Loading each store file for every store in parallel
3) Closing each store in parallel
4) Closing each store file for every store in parallel.

Test Plan:
1) Test it on the dark launch cluster by replacing one region server's hbase
jar.

The test shows the single region server (22 regions) restart time decreased
from 78 sec to 55 sec
So this will roughly save about 29% region server restart time.

After committed, I can evaluate the whole cluster restart time improvement on
the dark launch cluster.

2) Running all the unit tests
3) Test it on dev cluster

Reviewers: kannan, kranganathan

Reviewed By: kannan

CC: hbase-eng@lists, kannan

Differential Revision: https://phabricator.fb.com/D382456

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1232729&r1=1232728&r2=1232729&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Jan 18 03:07:06 2012
@@ -191,6 +191,19 @@ public final class HConstants {
   /** File Extension used while splitting an HLog into regions (HBASE-2312) */
   public static final String HLOG_SPLITTING_EXT = "-splitting";
 
+  /**
+   * The max number of threads used for opening and closing stores or store
+   * files in parallel
+   */
+  public static final String HSTORE_OPEN_AND_CLOSE_THREADS_MAX =
+    "hbase.hstore.open.and.close.threads.max";
+
+  /**
+   * The default number for the max number of threads used for opening and
+   * closing stores or store files in parallel
+   */
+  public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 8;
+
   /** Default maximum file size */
   public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1232729&r1=1232728&r2=1232729&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jan 18 03:07:06 2012
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -107,6 +106,7 @@ import org.apache.hadoop.hbase.util.Info
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
 import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
@@ -300,16 +300,12 @@ public class HMaster extends Thread impl
     }
 
     // initilize the thread pool for log splitting.
-    int maxSplitLogThread = conf
-        .getInt("hbase.master.splitLogThread.max", 1000);
-    // Unfortunately Executors.newCachedThreadPool does not allow us to
-    // set the maximum size of the pool, so we have to do it ourselves.
-    logSplitThreadPool = new ThreadPoolExecutor(maxSplitLogThread,
+    int maxSplitLogThread =
+      conf.getInt("hbase.master.splitLogThread.max", 1000);
+    logSplitThreadPool = Threads.getBoundedCachedThreadPool(
         maxSplitLogThread, 30L, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(),
         new ThreadFactory() {
           private int count = 1;
-
           public Thread newThread(Runnable r) {
             Thread t = new Thread(r, "LogSplittingThread" + "-" + count++);
             if (!t.isDaemon())
@@ -317,8 +313,6 @@ public class HMaster extends Thread impl
             return t;
           }
         });
-    // allow the core pool threads timeout and terminate
-    logSplitThreadPool.allowCoreThreadTimeOut(true);
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1232729&r1=1232728&r2=1232729&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jan 18 03:07:06 2012
@@ -39,9 +39,16 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -89,12 +96,14 @@ import org.apache.hadoop.hbase.util.Clas
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -423,7 +432,7 @@ public class HRegion implements HeapSize
 
    */
   public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
-      HRegionInfo regionInfo, FlushRequester flushListener) {
+      final HRegionInfo regionInfo, FlushRequester flushListener) {
     this.tableDir = tableDir;
     this.comparator = regionInfo.getComparator();
     this.log = log;
@@ -510,20 +519,50 @@ public class HRegion implements HeapSize
       long maxSeqId = -1;
       // initialized to -1 so that we pick up MemstoreTS from column families
       long maxMemstoreTS = -1;
-      for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
-        status.setStatus("Instantiating store for column family " + c);
-        Store store = instantiateHStore(this.tableDir, c);
-        this.stores.put(c.getName(), store);
-        long storeSeqId = store.getMaxSequenceId();
-        if (minSeqId == -1 || storeSeqId < minSeqId) {
-          minSeqId = storeSeqId;
-        }
-        if (maxSeqId == -1 || storeSeqId > maxSeqId) {
-          maxSeqId = storeSeqId;
-        }
-        long maxStoreMemstoreTS = store.getMaxMemstoreTS();
-        if (maxStoreMemstoreTS > maxMemstoreTS) {
-          maxMemstoreTS = maxStoreMemstoreTS;
+      Collection<HColumnDescriptor> families =
+        this.regionInfo.getTableDesc().getFamilies();
+
+      if (!families.isEmpty()) {
+        // initialize the thread pool for opening stores in parallel.
+        ThreadPoolExecutor storeOpenerThreadPool =
+          getStoreOpenAndCloseThreadPool(
+            "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
+        CompletionService<Store> completionService =
+          new ExecutorCompletionService<Store>(storeOpenerThreadPool);
+
+        // initialize each store in parallel
+        for (final HColumnDescriptor family : families) {
+          status.setStatus("Instantiating store for column family " + family);
+          completionService.submit(new Callable<Store>() {
+            public Store call() throws IOException {
+              return instantiateHStore(tableDir, family);
+            }
+          });
+        }
+        try {
+          for (int i = 0; i < families.size(); i++) {
+            Future<Store> future = completionService.take();
+            Store store = future.get();
+
+            this.stores.put(store.getColumnFamilyName().getBytes(), store);
+            long storeSeqId = store.getMaxSequenceId();
+            if (minSeqId == -1 || storeSeqId < minSeqId) {
+              minSeqId = storeSeqId;
+            }
+            if (maxSeqId == -1 || storeSeqId > maxSeqId) {
+              maxSeqId = storeSeqId;
+            }
+            long maxStoreMemstoreTS = store.getMaxMemstoreTS();
+            if (maxStoreMemstoreTS > maxMemstoreTS) {
+              maxMemstoreTS = maxStoreMemstoreTS;
+            }
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e.getCause());
+        } finally {
+          storeOpenerThreadPool.shutdownNow();
         }
       }
       rwcc.initialize(maxMemstoreTS + 1);
@@ -748,10 +787,40 @@ public class HRegion implements HeapSize
           if (!abort) {
             internalFlushcache(status);
           }
-
           List<StoreFile> result = new ArrayList<StoreFile>();
-          for (Store store: stores.values()) {
-            result.addAll(store.close());
+
+          if (!stores.isEmpty()) {
+            // initialize the thread pool for closing stores in parallel.
+            ThreadPoolExecutor storeCloserThreadPool =
+              getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+                + this.regionInfo.getRegionNameAsString());
+            CompletionService<ImmutableList<StoreFile>> completionService =
+              new ExecutorCompletionService<ImmutableList<StoreFile>>(
+                storeCloserThreadPool);
+
+            // close each store in parallel
+            for (final Store store : stores.values()) {
+              completionService
+                  .submit(new Callable<ImmutableList<StoreFile>>() {
+                    public ImmutableList<StoreFile> call() throws IOException {
+                      return store.close();
+                    }
+                  });
+            }
+            try {
+              for (int i = 0; i < stores.size(); i++) {
+                Future<ImmutableList<StoreFile>> future = completionService
+                    .take();
+                ImmutableList<StoreFile> storeFileList = future.get();
+                result.addAll(storeFileList);
+              }
+            } catch (InterruptedException e) {
+              throw new IOException(e);
+            } catch (ExecutionException e) {
+              throw new IOException(e.getCause());
+            } finally {
+              storeCloserThreadPool.shutdownNow();
+            }
           }
           this.closed.set(true);
           status.markComplete("Closed");
@@ -768,6 +837,40 @@ public class HRegion implements HeapSize
     }
   }
 
+  protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
+      final String threadNamePrefix) {
+    int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
+    int maxThreads = Math.min(numStores,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
+      final String threadNamePrefix) {
+    int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
+    int maxThreads = Math.max(1,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
+            / numStores);
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+      final String threadNamePrefix) {
+    ThreadPoolExecutor openAndCloseThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                return t;
+              }
+            });
+    return openAndCloseThreadPool;
+  }
+
    /**
     * @return True if its worth doing a flush before we put up the close flag.
     */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1232729&r1=1232728&r2=1232729&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Jan 18 03:07:06 2012
@@ -29,7 +29,13 @@ import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -270,36 +276,69 @@ public class Store extends SchemaConfigu
     return homedir;
   }
 
-  /*
-   * Creates an unsorted list of StoreFile loaded from the given directory.
+  /**
+   * Creates an unsorted list of StoreFile loaded in parallel
+   * from the given directory.
    * @throws IOException
    */
-  private List<StoreFile> loadStoreFiles()
-  throws IOException {
+  private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
     FileStatus files[] = this.fs.listStatus(this.homedir);
+
+    if (files.length == 0) {
+      return results;
+    }
+    // initialize the thread pool for opening store files in parallel..
+    ThreadPoolExecutor storeFileOpenerThreadPool =
+      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
+          this.family.getNameAsString());
+    CompletionService<StoreFile> completionService =
+      new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
+
+    int totalValidStoreFile = 0;
     for (int i = 0; files != null && i < files.length; i++) {
       // Skip directories.
       if (files[i].isDir()) {
         continue;
       }
-      Path p = files[i].getPath();
-      // Check for empty file.  Should never be the case but can happen
+      final Path p = files[i].getPath();
+      // Check for empty file. Should never be the case but can happen
       // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
       if (this.fs.getFileStatus(p).getLen() <= 0) {
         LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
         continue;
       }
-      StoreFile curfile = new StoreFile(fs, p, blockcache, this.conf,
-          this.family.getBloomFilterType(), this.inMemory);
-      curfile.createReader();
-      long length = curfile.getReader().length();
-      this.storeSize += length;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loaded " + curfile.toStringDetailed());
+      // open each store file in parallel
+      completionService.submit(new Callable<StoreFile>() {
+        public StoreFile call() throws IOException {
+          StoreFile storeFile = new StoreFile(fs, p, blockcache, conf, family
+              .getBloomFilterType(), inMemory);
+          storeFile.createReader();
+          return storeFile;
+        }
+      });
+      totalValidStoreFile++;
+    }
+
+    try {
+      for (int i = 0; i < totalValidStoreFile; i++) {
+        Future<StoreFile> future = completionService.take();
+        StoreFile storeFile = future.get();
+        long length = storeFile.getReader().length();
+        this.storeSize += length;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("loaded " + storeFile.toStringDetailed());
+        }
+        results.add(storeFile);
       }
-      results.add(curfile);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      storeFileOpenerThreadPool.shutdownNow();
     }
+
     return results;
   }
 
@@ -439,9 +478,36 @@ public class Store extends SchemaConfigu
 
       // Clear so metrics doesn't find them.
       storefiles = ImmutableList.of();
+      if (!result.isEmpty()) {
+        // initialize the thread pool for closing store files in parallel.
+        ThreadPoolExecutor storeFileCloserThreadPool = this.region
+            .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+                + this.family.getNameAsString());
+
+        // close each store file in parallel
+        CompletionService<Void> completionService =
+          new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
+        for (final StoreFile f : result) {
+          completionService.submit(new Callable<Void>() {
+            public Void call() throws IOException {
+              f.closeReader();
+              return null;
+            }
+          });
+        }
 
-      for (StoreFile f: result) {
-        f.closeReader();
+        try {
+          for (int i = 0; i < result.size(); i++) {
+            Future<Void> future = completionService.take();
+            future.get();
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e.getCause());
+        } finally {
+          storeFileCloserThreadPool.shutdownNow();
+        }
       }
       LOG.debug("closed " + this.storeNameStr);
       return result;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1232729&r1=1232728&r2=1232729&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Wed Jan 18 03:07:06 2012
@@ -19,14 +19,17 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import java.io.PrintWriter;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
 /**
  * Thread Utility
  */
@@ -144,4 +147,25 @@ public class Threads {
     }
   }
 
+  /**
+   * Create a new CachedThreadPool with a bounded number as the maximum
+   * thread size in the pool.
+   *
+   * @param maxCachedThread the maximum thread could be created in the pool
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @param threadFactory the factory to use when creating new threads
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number
+   * as the maximum thread size in the pool.
+   */
+  public static ThreadPoolExecutor getBoundedCachedThreadPool(
+      int maxCachedThread, long timeout, TimeUnit unit,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor boundedCachedThreadPool =
+      new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+    // allow the core pool threads timeout and terminate
+    boundedCachedThreadPool.allowCoreThreadTimeOut(true);
+    return boundedCachedThreadPool;
+  }
 }