You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/09/21 15:42:52 UTC

hbase git commit: HBASE-14448 Refine RegionGroupingProvider Phase-2: remove provider nesting and formalize wal group name (Yu Li)

Repository: hbase
Updated Branches:
  refs/heads/master a7afc132e -> bf8596013


HBASE-14448 Refine RegionGroupingProvider Phase-2: remove provider nesting and formalize wal group name (Yu Li)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bf859601
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bf859601
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bf859601

Branch: refs/heads/master
Commit: bf859601317dbf6493818dc2a27e5e813112eb2c
Parents: a7afc13
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 21 06:42:39 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 21 06:42:39 2015 -0700

----------------------------------------------------------------------
 .../hbase/wal/BoundedGroupingStrategy.java      |   2 +-
 .../hadoop/hbase/wal/DefaultWALProvider.java    |  19 +--
 .../hbase/wal/RegionGroupingProvider.java       | 136 ++++++++++++-------
 3 files changed, 98 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bf859601/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
index 14c5594..c8f434f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
 
 /**
- * A WAL grouping strategy that limits the number of delegate providers (i.e. wal group) to
+ * A WAL grouping strategy that limits the number of wal groups to
  * "hbase.wal.regiongrouping.numgroups".
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf859601/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index f15a387..be95e16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -74,7 +74,7 @@ public class DefaultWALProvider implements WALProvider {
     void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
   }
 
-  protected FSHLog log = null;
+  protected volatile FSHLog log = null;
   private WALFactory factory = null;
   private Configuration conf = null;
   private List<WALActionsListener> listeners = null;
@@ -119,13 +119,16 @@ public class DefaultWALProvider implements WALProvider {
 
   @Override
   public WAL getWAL(final byte[] identifier) throws IOException {
-    // must lock since getWAL will create hlog on fs which is time consuming
-    synchronized (walCreateLock) {
-      if (log == null) {
-        log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
-                getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
-                listeners, true, logPrefix,
-                META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+    if (log == null) {
+      // only lock when need to create wal, and need to lock since
+      // creating hlog on fs is time consuming
+      synchronized (walCreateLock) {
+        if (log == null) {
+          log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+              getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
+              listeners, true, logPrefix,
+              META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+        }
       }
     }
     return log;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf859601/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 8395818..00a5651 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -18,23 +18,30 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
  * A WAL Provider that returns a WAL per group of regions.
@@ -43,14 +50,11 @@ import org.apache.hadoop.hbase.util.Bytes;
  * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
  * <ul>
  *   <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
- *                                  "identity".</li>
+ *                                  "bounded".</li>
  *   <li><em>identity</em> : each region belongs to its own group.</li>
+ *   <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
  * </ul>
  * Optionally, a FQCN to a custom implementation may be given.
- *
- * WAL creation is delegated to another WALProvider, configured via the property
- * "hbase.wal.regiongrouping.delegate". The property takes the same options as "hbase.wal.provider"
- * (ref {@link WALFactory}) and defaults to the defaultProvider.
  */
 @InterfaceAudience.Private
 class RegionGroupingProvider implements WALProvider {
@@ -120,21 +124,22 @@ class RegionGroupingProvider implements WALProvider {
   public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
   public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
 
-  static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
-  static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
+  private static final String META_WAL_GROUP_NAME = "meta";
 
-  /** A group-provider mapping, recommended to make sure one-one rather than many-one mapping */
-  protected final ConcurrentMap<String, WALProvider> cached =
-      new ConcurrentHashMap<String, WALProvider>();
-  /** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */
-  private final Set<WALProvider> providers = Collections
-      .synchronizedSet(new HashSet<WALProvider>());
+  /** A group-wal mapping, recommended to make sure one-one rather than many-one mapping */
+  protected final Map<String, FSHLog> cached = new HashMap<String, FSHLog>();
+  /** Stores unique wals generated by this RegionGroupingProvider */
+  private final Set<FSHLog> logs = Collections.synchronizedSet(new HashSet<FSHLog>());
 
+  /**
+   * we synchronize on walCacheLock to prevent wal recreation in different threads
+   */
+  final Object walCacheLock = new Object();
 
   protected RegionGroupingStrategy strategy = null;
-  private WALFactory factory = null;
   private List<WALActionsListener> listeners = null;
   private String providerId = null;
+  private Configuration conf = null;
 
   @Override
   public void init(final WALFactory factory, final Configuration conf,
@@ -142,49 +147,80 @@ class RegionGroupingProvider implements WALProvider {
     if (null != strategy) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
-    this.factory = factory;
     this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
-    this.providerId = providerId;
+    StringBuilder sb = new StringBuilder().append(factory.factoryId);
+    if (providerId != null) {
+      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
+        sb.append(providerId);
+      } else {
+        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
+      }
+    }
+    this.providerId = sb.toString();
     this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
+    this.conf = conf;
   }
 
   /**
    * Populate the cache for this group.
    */
-  WALProvider populateCache(final String group) throws IOException {
-    final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
-        listeners, providerId + "-" + UUID.randomUUID());
-    final WALProvider extant = cached.putIfAbsent(group, temp);
-    if (null != extant) {
-      // someone else beat us to initializing, just take what they set.
-      temp.close();
-      return extant;
+  FSHLog populateCache(String groupName) throws IOException {
+    boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId);
+    String hlogPrefix;
+    List<WALActionsListener> listeners;
+    if (isMeta) {
+      hlogPrefix = this.providerId;
+      // don't watch log roll for meta
+      listeners = Collections.<WALActionsListener> singletonList(new MetricsWAL());
+    } else {
+      hlogPrefix = groupName;
+      listeners = this.listeners;
+    }
+    FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+        DefaultWALProvider.getWALDirectoryName(providerId), HConstants.HREGION_OLDLOGDIR_NAME,
+        conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : null);
+    cached.put(groupName, log);
+    logs.add(log);
+    return log;
+  }
+
+  private WAL getWAL(final String group) throws IOException {
+    WAL log = cached.get(walCacheLock);
+    if (null == log) {
+      // only lock when need to create wal, and need to lock since
+      // creating hlog on fs is time consuming
+      synchronized (this.walCacheLock) {
+        log = cached.get(group);// check again
+        if (null == log) {
+          log = populateCache(group);
+        }
+      }
     }
-    providers.add(temp);
-    return temp;
+    return log;
   }
 
   @Override
   public WAL getWAL(final byte[] identifier) throws IOException {
-    final String group = strategy.group(identifier);
-    WALProvider provider = cached.get(group);
-    if (null == provider) {
-      provider = populateCache(group);
+    final String group;
+    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
+      group = META_WAL_GROUP_NAME;
+    } else {
+      group = strategy.group(identifier);
     }
-    return provider.getWAL(identifier);
+    return getWAL(group);
   }
 
   @Override
   public void shutdown() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (providers) {
-      for (WALProvider provider : providers) {
+    synchronized (logs) {
+      for (FSHLog wal : logs) {
         try {
-          provider.shutdown();
+          wal.shutdown();
         } catch (IOException exception) {
-          LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
-          LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+          LOG.error("Problem shutting down log '" + wal + "': " + exception.getMessage());
+          LOG.debug("Details of problem shutting down log '" + wal + "'", exception);
           failure = exception;
         }
       }
@@ -198,13 +234,13 @@ class RegionGroupingProvider implements WALProvider {
   public void close() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (providers) {
-      for (WALProvider provider : providers) {
+    synchronized (logs) {
+      for (FSHLog wal : logs) {
         try {
-          provider.close();
+          wal.close();
         } catch (IOException exception) {
-          LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
-          LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+          LOG.error("Problem closing log '" + wal + "': " + exception.getMessage());
+          LOG.debug("Details of problem closing wal '" + wal + "'", exception);
           failure = exception;
         }
       }
@@ -226,9 +262,9 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getNumLogFiles() {
     long numLogFiles = 0;
-    synchronized (providers) {
-      for (WALProvider provider : providers) {
-        numLogFiles += provider.getNumLogFiles();
+    synchronized (logs) {
+      for (FSHLog wal : logs) {
+        numLogFiles += wal.getNumLogFiles();
       }
     }
     return numLogFiles;
@@ -237,9 +273,9 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getLogFileSize() {
     long logFileSize = 0;
-    synchronized (providers) {
-      for (WALProvider provider : providers) {
-        logFileSize += provider.getLogFileSize();
+    synchronized (logs) {
+      for (FSHLog wal : logs) {
+        logFileSize += wal.getLogFileSize();
       }
     }
     return logFileSize;