You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/12 02:19:27 UTC
svn commit: r1230347 - in
/hbase/trunk/src/main/java/org/apache/hadoop/hbase: HConstants.java
regionserver/HRegion.java regionserver/Store.java util/Threads.java
Author: tedyu
Date: Thu Jan 12 01:19:26 2012
New Revision: 1230347
URL: http://svn.apache.org/viewvc?rev=1230347&view=rev
Log:
HBASE-5033 Differential Revision: 933 Opening/Closing store in parallel to reduce region open/close time (Liyin)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jan 12 01:19:26 2012
@@ -19,15 +19,15 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
/**
* HConstants holds a bunch of HBase-related constants
*/
@@ -218,6 +218,19 @@ public final class HConstants {
public static final String HREGION_MAX_FILESIZE =
"hbase.hregion.max.filesize";
+ /**
+ * The max number of threads used for opening and closing stores or store
+ * files in parallel
+ */
+ public static final String HSTORE_OPEN_AND_CLOSE_THREADS_MAX =
+ "hbase.hstore.open.and.close.threads.max";
+
+ /**
+ * The default number for the max number of threads used for opening and
+ * closing stores or store files in parallel
+ */
+ public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1;
+
/** Default maximum file size */
public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jan 12 01:19:26 2012
@@ -40,10 +40,17 @@ import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -106,6 +113,7 @@ import org.apache.hadoop.hbase.util.Envi
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Writable;
@@ -114,6 +122,7 @@ import org.cliffc.high_scale_lib.Counter
import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
@@ -433,7 +442,7 @@ public class HRegion implements HeapSize
* @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)
*/
public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
- HRegionInfo regionInfo, final HTableDescriptor htd,
+ final HRegionInfo regionInfo, final HTableDescriptor htd,
RegionServerServices rsServices) {
this.tableDir = tableDir;
this.comparator = regionInfo.getComparator();
@@ -542,20 +551,49 @@ public class HRegion implements HeapSize
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
- for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) {
- status.setStatus("Instantiating store for column family " + c);
- Store store = instantiateHStore(this.tableDir, c);
- this.stores.put(c.getName(), store);
- long storeSeqId = store.getMaxSequenceId();
- if (minSeqId == -1 || storeSeqId < minSeqId) {
- minSeqId = storeSeqId;
- }
- if (maxSeqId == -1 || storeSeqId > maxSeqId) {
- maxSeqId = storeSeqId;
- }
- long maxStoreMemstoreTS = store.getMaxMemstoreTS();
- if (maxStoreMemstoreTS > maxMemstoreTS) {
- maxMemstoreTS = maxStoreMemstoreTS;
+
+ if (this.htableDescriptor != null &&
+ !htableDescriptor.getFamilies().isEmpty()) {
+ // initialize the thread pool for opening stores in parallel.
+ ThreadPoolExecutor storeOpenerThreadPool =
+ getStoreOpenAndCloseThreadPool(
+ "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
+ CompletionService<Store> completionService =
+ new ExecutorCompletionService<Store>(storeOpenerThreadPool);
+
+ // initialize each store in parallel
+ for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
+ status.setStatus("Instantiating store for column family " + family);
+ completionService.submit(new Callable<Store>() {
+ public Store call() throws IOException {
+ return instantiateHStore(tableDir, family);
+ }
+ });
+ }
+ try {
+ for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
+ Future<Store> future = completionService.take();
+ Store store = future.get();
+
+ this.stores.put(store.getColumnFamilyName().getBytes(), store);
+ long storeSeqId = store.getMaxSequenceId();
+ if (minSeqId == -1 || storeSeqId < minSeqId) {
+ minSeqId = storeSeqId;
+ }
+ if (maxSeqId == -1 || storeSeqId > maxSeqId) {
+ maxSeqId = storeSeqId;
+ }
+ long maxStoreMemstoreTS = store.getMaxMemstoreTS();
+ if (maxStoreMemstoreTS > maxMemstoreTS) {
+ maxMemstoreTS = maxStoreMemstoreTS;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ storeOpenerThreadPool.shutdownNow();
}
}
mvcc.initialize(maxMemstoreTS + 1);
@@ -883,8 +921,38 @@ public class HRegion implements HeapSize
}
List<StoreFile> result = new ArrayList<StoreFile>();
- for (Store store : stores.values()) {
- result.addAll(store.close());
+ if (!stores.isEmpty()) {
+ // initialize the thread pool for closing stores in parallel.
+ ThreadPoolExecutor storeCloserThreadPool =
+ getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+ + this.regionInfo.getRegionNameAsString());
+ CompletionService<ImmutableList<StoreFile>> completionService =
+ new ExecutorCompletionService<ImmutableList<StoreFile>>(
+ storeCloserThreadPool);
+
+ // close each store in parallel
+ for (final Store store : stores.values()) {
+ completionService
+ .submit(new Callable<ImmutableList<StoreFile>>() {
+ public ImmutableList<StoreFile> call() throws IOException {
+ return store.close();
+ }
+ });
+ }
+ try {
+ for (int i = 0; i < stores.size(); i++) {
+ Future<ImmutableList<StoreFile>> future = completionService
+ .take();
+ ImmutableList<StoreFile> storeFileList = future.get();
+ result.addAll(storeFileList);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ storeCloserThreadPool.shutdownNow();
+ }
}
this.closed.set(true);
@@ -900,6 +968,40 @@ public class HRegion implements HeapSize
}
}
+ protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
+ final String threadNamePrefix) {
+ int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
+ int maxThreads = Math.min(numStores,
+ conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+ HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
+ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+ }
+
+ protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
+ final String threadNamePrefix) {
+ int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
+ int maxThreads = Math.max(1,
+ conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+ HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
+ / numStores);
+ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+ }
+
+ private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ final String threadNamePrefix) {
+ ThreadPoolExecutor openAndCloseThreadPool = Threads
+ .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+ return t;
+ }
+ });
+ return openAndCloseThreadPool;
+ }
+
/**
* @return True if its worth doing a flush before we put up the close flag.
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Jan 12 01:19:26 2012
@@ -27,7 +27,13 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -271,38 +277,73 @@ public class Store extends SchemaConfigu
return homedir;
}
- /*
- * Creates an unsorted list of StoreFile loaded from the given directory.
+ /**
+ * Creates an unsorted list of StoreFile loaded in parallel
+ * from the given directory.
* @throws IOException
*/
- private List<StoreFile> loadStoreFiles()
- throws IOException {
+ private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
- for (int i = 0; files != null && i < files.length; i++) {
+
+ if (files == null || files.length == 0) {
+ return results;
+ }
+ // initialize the thread pool for opening store files in parallel..
+ ThreadPoolExecutor storeFileOpenerThreadPool =
+ this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
+ this.family.getNameAsString());
+ CompletionService<StoreFile> completionService =
+ new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
+
+ int totalValidStoreFile = 0;
+ for (int i = 0; i < files.length; i++) {
// Skip directories.
if (files[i].isDir()) {
continue;
}
- Path p = files[i].getPath();
- // Check for empty file. Should never be the case but can happen
+ final Path p = files[i].getPath();
+ // Check for empty file. Should never be the case but can happen
// after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
if (this.fs.getFileStatus(p).getLen() <= 0) {
LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
continue;
}
- StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
- passSchemaMetricsTo(curfile);
- curfile.createReader();
- long length = curfile.getReader().length();
- this.storeSize += length;
- this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes();
- if (LOG.isDebugEnabled()) {
- LOG.debug("loaded " + curfile.toStringDetailed());
+
+ // open each store file in parallel
+ completionService.submit(new Callable<StoreFile>() {
+ public StoreFile call() throws IOException {
+ StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
+ family.getBloomFilterType());
+ passSchemaMetricsTo(storeFile);
+ storeFile.createReader();
+ return storeFile;
+ }
+ });
+ totalValidStoreFile++;
+ }
+
+ try {
+ for (int i = 0; i < totalValidStoreFile; i++) {
+ Future<StoreFile> future = completionService.take();
+ StoreFile storeFile = future.get();
+ long length = storeFile.getReader().length();
+ this.storeSize += length;
+ this.totalUncompressedBytes +=
+ storeFile.getReader().getTotalUncompressedBytes();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loaded " + storeFile.toStringDetailed());
+ }
+ results.add(storeFile);
}
- results.add(curfile);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ storeFileOpenerThreadPool.shutdownNow();
}
+
return results;
}
@@ -499,8 +540,36 @@ public class Store extends SchemaConfigu
// Clear so metrics doesn't find them.
storefiles = ImmutableList.of();
- for (StoreFile f: result) {
- f.closeReader(true);
+ if (!result.isEmpty()) {
+ // initialize the thread pool for closing store files in parallel.
+ ThreadPoolExecutor storeFileCloserThreadPool = this.region
+ .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+ + this.family.getNameAsString());
+
+ // close each store file in parallel
+ CompletionService<Void> completionService =
+ new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
+ for (final StoreFile f : result) {
+ completionService.submit(new Callable<Void>() {
+ public Void call() throws IOException {
+ f.closeReader(true);
+ return null;
+ }
+ });
+ }
+
+ try {
+ for (int i = 0; i < result.size(); i++) {
+ Future<Void> future = completionService.take();
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ storeFileCloserThreadPool.shutdownNow();
+ }
}
LOG.debug("closed " + this.storeNameStr);
return result;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1230347&r1=1230346&r2=1230347&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java Thu Jan 12 01:19:26 2012
@@ -19,14 +19,17 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import java.io.PrintWriter;
-import org.apache.hadoop.util.ReflectionUtils;
-
import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
/**
* Thread Utility
*/
@@ -152,4 +155,25 @@ public class Threads {
}
}
+ /**
+ * Create a new CachedThreadPool with a bounded number as the maximum
+ * thread size in the pool.
+ *
+ * @param maxCachedThread the maximum thread could be created in the pool
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @param threadFactory the factory to use when creating new threads
+ * @return threadPoolExecutor the cachedThreadPool with a bounded number
+ * as the maximum thread size in the pool.
+ */
+ public static ThreadPoolExecutor getBoundedCachedThreadPool(
+ int maxCachedThread, long timeout, TimeUnit unit,
+ ThreadFactory threadFactory) {
+ ThreadPoolExecutor boundedCachedThreadPool =
+ new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+ // allow the core pool threads timeout and terminate
+ boundedCachedThreadPool.allowCoreThreadTimeOut(true);
+ return boundedCachedThreadPool;
+ }
}