You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/09/14 16:15:46 UTC
hbase git commit: HBASE-14306 Refine RegionGroupingProvider: fix
issues and make it more scalable (Yu Li)
Repository: hbase
Updated Branches:
refs/heads/master ceff3e242 -> 99df022f2
HBASE-14306 Refine RegionGroupingProvider: fix issues and make it more scalable (Yu Li)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/99df022f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/99df022f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/99df022f
Branch: refs/heads/master
Commit: 99df022f2c3c811dc68abb2c7a7915b47fe6f5ac
Parents: ceff3e2
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 14 07:15:35 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 14 07:15:35 2015 -0700
----------------------------------------------------------------------
.../MetricsRegionServerWrapperImpl.java | 13 +-
.../hbase/wal/BoundedGroupingStrategy.java | 67 +++++++
.../wal/BoundedRegionGroupingProvider.java | 159 ----------------
.../hadoop/hbase/wal/DefaultWALProvider.java | 28 +--
.../hadoop/hbase/wal/DisabledWALProvider.java | 10 +
.../hbase/wal/RegionGroupingProvider.java | 92 ++++++---
.../org/apache/hadoop/hbase/wal/WALFactory.java | 10 +-
.../apache/hadoop/hbase/wal/WALProvider.java | 10 +
.../apache/hadoop/hbase/wal/IOTestProvider.java | 10 +
.../wal/TestBoundedRegionGroupingProvider.java | 187 ------------------
.../wal/TestBoundedRegionGroupingStrategy.java | 190 +++++++++++++++++++
11 files changed, 373 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index e889605..8ad1f33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -35,12 +35,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
-import org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.mob.MobFileCache;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.metrics2.MetricsExecutor;
@@ -660,10 +659,12 @@ class MetricsRegionServerWrapperImpl
}
lastRan = currentTime;
- numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory) +
- BoundedRegionGroupingProvider.getNumLogFiles(regionServer.walFactory);
- walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory) +
- BoundedRegionGroupingProvider.getLogFileSize(regionServer.walFactory);
+ WALProvider provider = regionServer.walFactory.getWALProvider();
+ WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider();
+ numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) +
+ (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
+ walFileSize = (provider == null ? 0 : provider.getLogFileSize()) +
+ (provider == null ? 0 : provider.getLogFileSize());
//Copy over computed values so that no thread sees half computed values.
numStores = tempNumStores;
numStoreFiles = tempNumStoreFiles;
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
new file mode 100644
index 0000000..14c5594
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
+
+/**
+ * A WAL grouping strategy that limits the number of delegate providers (i.e. wal group) to
+ * "hbase.wal.regiongrouping.numgroups".
+ */
+@InterfaceAudience.Private
+public class BoundedGroupingStrategy implements RegionGroupingStrategy{
+
+ static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
+ static final int DEFAULT_NUM_REGION_GROUPS = 2;
+
+ private ConcurrentHashMap<String, String> groupNameCache =
+ new ConcurrentHashMap<String, String>();
+ private AtomicInteger counter = new AtomicInteger(0);
+ private String[] groupNames;
+
+ @Override
+ public String group(byte[] identifier) {
+ String idStr = Bytes.toString(identifier);
+ String groupName = groupNameCache.get(idStr);
+ if (null == groupName) {
+ groupName = groupNames[counter.getAndIncrement() % groupNames.length];
+ String extantName = groupNameCache.putIfAbsent(idStr, groupName);
+ if (extantName != null) {
+ return extantName;
+ }
+ }
+ return groupName;
+ }
+
+ @Override
+ public void init(Configuration config, String providerId) {
+ int regionGroupNumber = config.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+ groupNames = new String[regionGroupNumber];
+ for (int i = 0; i < regionGroupNumber; i++) {
+ groupNames[i] = providerId + GROUP_NAME_DELIMITER + "regiongroup-" + i;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
deleted file mode 100644
index e1417b2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-// imports for classes still in regionserver.wal
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-
-/**
- * A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to them.
- * Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control
- * the choice of delegate provider implementation and the grouping strategy the same as
- * {@link RegionGroupingProvider}.
- */
-@InterfaceAudience.Private
-public class BoundedRegionGroupingProvider extends RegionGroupingProvider {
- private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class);
-
- static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
- static final int DEFAULT_NUM_REGION_GROUPS = 2;
- private WALProvider[] delegates;
- private AtomicInteger counter = new AtomicInteger(0);
-
- @Override
- public void init(final WALFactory factory, final Configuration conf,
- final List<WALActionsListener> listeners, final String providerId) throws IOException {
- super.init(factory, conf, listeners, providerId);
- // no need to check for and close down old providers; our parent class will throw on re-invoke
- delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS,
- DEFAULT_NUM_REGION_GROUPS))];
- for (int i = 0; i < delegates.length; i++) {
- delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners,
- providerId + i);
- }
- LOG.info("Configured to run with " + delegates.length + " delegate WAL providers.");
- }
-
- @Override
- WALProvider populateCache(final byte[] group) {
- final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length];
- final WALProvider extant = cached.putIfAbsent(group, temp);
- // if someone else beat us to initializing, just take what they set.
- // note that in such a case we skew load away from the provider we picked at first
- return extant == null ? temp : extant;
- }
-
- @Override
- public void shutdown() throws IOException {
- // save the last exception and rethrow
- IOException failure = null;
- for (WALProvider provider : delegates) {
- try {
- provider.shutdown();
- } catch (IOException exception) {
- LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
- LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
- failure = exception;
- }
- }
- if (failure != null) {
- throw failure;
- }
- }
-
- @Override
- public void close() throws IOException {
- // save the last exception and rethrow
- IOException failure = null;
- for (WALProvider provider : delegates) {
- try {
- provider.close();
- } catch (IOException exception) {
- LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
- LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
- failure = exception;
- }
- }
- if (failure != null) {
- throw failure;
- }
- }
-
- /**
- * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta,
- * count the number of files (rolled and active). if either of them isn't, count 0
- * for that provider.
- * @param walFactory may not be null.
- */
- public static long getNumLogFiles(WALFactory walFactory) {
- long result = 0;
- if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
- BoundedRegionGroupingProvider groupProviders =
- (BoundedRegionGroupingProvider)walFactory.provider;
- for (int i = 0; i < groupProviders.delegates.length; i++) {
- result +=
- ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getNumLogFiles();
- }
- }
- WALProvider meta = walFactory.metaProvider.get();
- if (meta instanceof BoundedRegionGroupingProvider) {
- for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) {
- result += ((FSHLog)
- ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
- .getNumLogFiles(); }
- }
- return result;
- }
-
- /**
- * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta,
- * count the size of files (rolled and active). if either of them isn't, count 0
- * for that provider.
- * @param walFactory may not be null.
- */
- public static long getLogFileSize(WALFactory walFactory) {
- long result = 0;
- if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
- BoundedRegionGroupingProvider groupProviders =
- (BoundedRegionGroupingProvider)walFactory.provider;
- for (int i = 0; i < groupProviders.delegates.length; i++) {
- result +=
- ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getLogFileSize();
- }
- }
- WALProvider meta = walFactory.metaProvider.get();
- if (meta instanceof BoundedRegionGroupingProvider) {
- for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) {
- result += ((FSHLog)
- ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
- .getLogFileSize();
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index f9cbd23..ca463fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -126,36 +126,20 @@ public class DefaultWALProvider implements WALProvider {
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
* count the number of files (rolled and active). if either of them aren't, count 0
* for that provider.
- * @param walFactory may not be null.
*/
- public static long getNumLogFiles(WALFactory walFactory) {
- long result = 0;
- if (walFactory.provider instanceof DefaultWALProvider) {
- result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getNumLogFiles();
- }
- WALProvider meta = walFactory.metaProvider.get();
- if (meta instanceof DefaultWALProvider) {
- result += ((FSHLog)((DefaultWALProvider)meta).log).getNumLogFiles();
- }
- return result;
+ @Override
+ public long getNumLogFiles() {
+ return this.log.getNumLogFiles();
}
/**
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
* count the size of files (rolled and active). if either of them aren't, count 0
* for that provider.
- * @param walFactory may not be null.
*/
- public static long getLogFileSize(WALFactory walFactory) {
- long result = 0;
- if (walFactory.provider instanceof DefaultWALProvider) {
- result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getLogFileSize();
- }
- WALProvider meta = walFactory.metaProvider.get();
- if (meta instanceof DefaultWALProvider) {
- result += ((FSHLog)((DefaultWALProvider)meta).log).getLogFileSize();
- }
- return result;
+ @Override
+ public long getLogFileSize() {
+ return this.log.getLogFileSize();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 52becbe..10a097b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -219,4 +219,14 @@ class DisabledWALProvider implements WALProvider {
return "WAL disabled.";
}
}
+
+ @Override
+ public long getNumLogFiles() {
+ return 0;
+ }
+
+ @Override
+ public long getLogFileSize() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index eb2c426..8395818 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -32,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* A WAL Provider that returns a WAL per group of regions.
@@ -57,21 +60,23 @@ class RegionGroupingProvider implements WALProvider {
* Map identifiers to a group number.
*/
public static interface RegionGroupingStrategy {
+ String GROUP_NAME_DELIMITER = ".";
/**
* Given an identifier, pick a group.
* the byte[] returned for a given group must always use the same instance, since we
* will be using it as a hash key.
*/
- byte[] group(final byte[] identifier);
- void init(Configuration config);
+ String group(final byte[] identifier);
+ void init(Configuration config, String providerId);
}
/**
* Maps between configuration names for strategies and implementation classes.
*/
static enum Strategies {
- defaultStrategy(IdentityGroupingStrategy.class),
- identity(IdentityGroupingStrategy.class);
+ defaultStrategy(BoundedGroupingStrategy.class),
+ identity(IdentityGroupingStrategy.class),
+ bounded(BoundedGroupingStrategy.class);
final Class<? extends RegionGroupingStrategy> clazz;
Strategies(Class<? extends RegionGroupingStrategy> clazz) {
@@ -97,7 +102,7 @@ class RegionGroupingProvider implements WALProvider {
LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
try {
final RegionGroupingStrategy result = clazz.newInstance();
- result.init(conf);
+ result.init(conf, providerId);
return result;
} catch (InstantiationException exception) {
LOG.error("couldn't set up region grouping strategy, check config key " +
@@ -112,14 +117,18 @@ class RegionGroupingProvider implements WALProvider {
}
}
- private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
- private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
+ public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
+ public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
- protected final ConcurrentMap<byte[], WALProvider> cached =
- new ConcurrentHashMap<byte[], WALProvider>();
+ /** A group-provider mapping, recommended to make sure one-one rather than many-one mapping */
+ protected final ConcurrentMap<String, WALProvider> cached =
+ new ConcurrentHashMap<String, WALProvider>();
+ /** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */
+ private final Set<WALProvider> providers = Collections
+ .synchronizedSet(new HashSet<WALProvider>());
protected RegionGroupingStrategy strategy = null;
@@ -142,7 +151,7 @@ class RegionGroupingProvider implements WALProvider {
/**
* Populate the cache for this group.
*/
- WALProvider populateCache(final byte[] group) throws IOException {
+ WALProvider populateCache(final String group) throws IOException {
final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
listeners, providerId + "-" + UUID.randomUUID());
final WALProvider extant = cached.putIfAbsent(group, temp);
@@ -151,12 +160,13 @@ class RegionGroupingProvider implements WALProvider {
temp.close();
return extant;
}
+ providers.add(temp);
return temp;
}
@Override
public WAL getWAL(final byte[] identifier) throws IOException {
- final byte[] group = strategy.group(identifier);
+ final String group = strategy.group(identifier);
WALProvider provider = cached.get(group);
if (null == provider) {
provider = populateCache(group);
@@ -168,13 +178,15 @@ class RegionGroupingProvider implements WALProvider {
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
- for (WALProvider provider : cached.values()) {
- try {
- provider.shutdown();
- } catch (IOException exception) {
- LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
- LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
- failure = exception;
+ synchronized (providers) {
+ for (WALProvider provider : providers) {
+ try {
+ provider.shutdown();
+ } catch (IOException exception) {
+ LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
+ LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+ failure = exception;
+ }
}
}
if (failure != null) {
@@ -186,13 +198,15 @@ class RegionGroupingProvider implements WALProvider {
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
- for (WALProvider provider : cached.values()) {
- try {
- provider.close();
- } catch (IOException exception) {
- LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
- LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
- failure = exception;
+ synchronized (providers) {
+ for (WALProvider provider : providers) {
+ try {
+ provider.close();
+ } catch (IOException exception) {
+ LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
+ LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+ failure = exception;
+ }
}
}
if (failure != null) {
@@ -202,11 +216,33 @@ class RegionGroupingProvider implements WALProvider {
static class IdentityGroupingStrategy implements RegionGroupingStrategy {
@Override
- public void init(Configuration config) {}
+ public void init(Configuration config, String providerId) {}
@Override
- public byte[] group(final byte[] identifier) {
- return identifier;
+ public String group(final byte[] identifier) {
+ return Bytes.toString(identifier);
}
}
+ @Override
+ public long getNumLogFiles() {
+ long numLogFiles = 0;
+ synchronized (providers) {
+ for (WALProvider provider : providers) {
+ numLogFiles += provider.getNumLogFiles();
+ }
+ }
+ return numLogFiles;
+ }
+
+ @Override
+ public long getLogFileSize() {
+ long logFileSize = 0;
+ synchronized (providers) {
+ for (WALProvider provider : providers) {
+ logFileSize += provider.getLogFileSize();
+ }
+ }
+ return logFileSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index bfeae68..e430e06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -75,7 +75,7 @@ public class WALFactory {
static enum Providers {
defaultProvider(DefaultWALProvider.class),
filesystem(DefaultWALProvider.class),
- multiwal(BoundedRegionGroupingProvider.class);
+ multiwal(RegionGroupingProvider.class);
Class<? extends WALProvider> clazz;
Providers(Class<? extends WALProvider> clazz) {
@@ -443,4 +443,12 @@ public class WALFactory {
throws IOException {
return DefaultWALProvider.createWriter(configuration, fs, path, false);
}
+
+ public final WALProvider getWALProvider() {
+ return this.provider;
+ }
+
+ public final WALProvider getMetaWALProvider() {
+ return this.metaProvider.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 178c322..b007ad2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -79,4 +79,14 @@ public interface WALProvider {
long getLength() throws IOException;
}
+ /**
+ * Get number of the log files this provider is managing
+ */
+ long getNumLogFiles();
+
+ /**
+ * Get size of the log files this provider is managing
+ */
+ long getLogFileSize();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d2581a1..e06a587 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -230,4 +230,14 @@ public class IOTestProvider implements WALProvider {
}
}
}
+
+ @Override
+ public long getNumLogFiles() {
+ return this.log.getNumLogFiles();
+ }
+
+ @Override
+ public long getLogFileSize() {
+ return this.log.getLogFileSize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
deleted file mode 100644
index f4d38a0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS;
-import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS;
-import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category({RegionServerTests.class, LargeTests.class})
-public class TestBoundedRegionGroupingProvider {
- protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class);
-
- @Rule
- public TestName currentTest = new TestName();
- protected static Configuration conf;
- protected static FileSystem fs;
- protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- @Before
- public void setUp() throws Exception {
- FileStatus[] entries = fs.listStatus(new Path("/"));
- for (FileStatus dir : entries) {
- fs.delete(dir.getPath(), true);
- }
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf = TEST_UTIL.getConfiguration();
- // Make block sizes small.
- conf.setInt("dfs.blocksize", 1024 * 1024);
- // quicker heartbeat interval for faster DN death notification
- conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
- conf.setInt("dfs.heartbeat.interval", 1);
- conf.setInt("dfs.client.socket-timeout", 5000);
-
- // faster failover with cluster.shutdown();fs.close() idiom
- conf.setInt("hbase.ipc.client.connect.max.retries", 1);
- conf.setInt("dfs.client.block.recovery.retries", 1);
- conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
-
- conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class);
-
- TEST_UTIL.startMiniDFSCluster(3);
-
- fs = TEST_UTIL.getDFSCluster().getFileSystem();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * Write to a log file with three concurrent threads and verifying all data is written.
- */
- @Test
- public void testConcurrentWrites() throws Exception {
- // Run the WPE tool with three threads writing 3000 edits each concurrently.
- // When done, verify that all edits were written.
- int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
- new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
- assertEquals(0, errCode);
- }
-
- /**
- * Make sure we can successfully run with more regions then our bound.
- */
- @Test
- public void testMoreRegionsThanBound() throws Exception {
- final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
- int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
- new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
- "-regions", parallelism});
- assertEquals(0, errCode);
- }
-
- @Test
- public void testBoundsGreaterThanDefault() throws Exception {
- final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
- try {
- conf.setInt(NUM_REGION_GROUPS, temp*4);
- final String parallelism = Integer.toString(temp*4);
- int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
- new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
- "-regions", parallelism});
- assertEquals(0, errCode);
- } finally {
- conf.setInt(NUM_REGION_GROUPS, temp);
- }
- }
-
- @Test
- public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
- final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
- try {
- conf.setInt(NUM_REGION_GROUPS, temp*4);
- final String parallelism = Integer.toString(temp*4*2);
- int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
- new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
- "-regions", parallelism});
- assertEquals(0, errCode);
- } finally {
- conf.setInt(NUM_REGION_GROUPS, temp);
- }
- }
-
- /**
- * Ensure that we can use Set.add to deduplicate WALs
- */
- @Test
- public void setMembershipDedups() throws IOException {
- final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
- WALFactory wals = null;
- try {
- conf.setInt(NUM_REGION_GROUPS, temp*4);
- // Set HDFS root directory for storing WAL
- FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
-
- wals = new WALFactory(conf, null, currentTest.getMethodName());
- final Set<WAL> seen = new HashSet<WAL>(temp*4);
- final Random random = new Random();
- int count = 0;
- // we know that this should see one of the wals more than once
- for (int i = 0; i < temp*8; i++) {
- final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()));
- LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
- if (seen.add(maybeNewWAL)) {
- count++;
- }
- }
- assertEquals("received back a different number of WALs that are not equal() to each other " +
- "than the bound we placed.", temp*4, count);
- } finally {
- if (wals != null) {
- wals.close();
- }
- conf.setInt(NUM_REGION_GROUPS, temp);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/99df022f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
new file mode 100644
index 0000000..8b15150
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
+import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
+import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
+import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestBoundedRegionGroupingStrategy {
+ protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
+
+ @Rule
+ public TestName currentTest = new TestName();
+ protected static Configuration conf;
+ protected static FileSystem fs;
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Before
+ public void setUp() throws Exception {
+ FileStatus[] entries = fs.listStatus(new Path("/"));
+ for (FileStatus dir : entries) {
+ fs.delete(dir.getPath(), true);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ // Make block sizes small.
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ // quicker heartbeat interval for faster DN death notification
+ conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ conf.setInt("dfs.client.socket-timeout", 5000);
+
+ // faster failover with cluster.shutdown();fs.close() idiom
+ conf.setInt("hbase.ipc.client.connect.max.retries", 1);
+ conf.setInt("dfs.client.block.recovery.retries", 1);
+ conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
+
+ conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
+ conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
+
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Write to a log file with three concurrent threads and verifying all data is written.
+ */
+ @Test
+ public void testConcurrentWrites() throws Exception {
+ // Run the WPE tool with three threads writing 3000 edits each concurrently.
+ // When done, verify that all edits were written.
+ int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+ new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
+ assertEquals(0, errCode);
+ }
+
+ /**
+ * Make sure we can successfully run with more regions then our bound.
+ */
+ @Test
+ public void testMoreRegionsThanBound() throws Exception {
+ final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
+ int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+ new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
+ "-regions", parallelism});
+ assertEquals(0, errCode);
+ }
+
+ @Test
+ public void testBoundsGreaterThanDefault() throws Exception {
+ final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+ try {
+ conf.setInt(NUM_REGION_GROUPS, temp*4);
+ final String parallelism = Integer.toString(temp*4);
+ int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+ new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
+ "-regions", parallelism});
+ assertEquals(0, errCode);
+ } finally {
+ conf.setInt(NUM_REGION_GROUPS, temp);
+ }
+ }
+
+ @Test
+ public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
+ final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+ try {
+ conf.setInt(NUM_REGION_GROUPS, temp*4);
+ final String parallelism = Integer.toString(temp*4*2);
+ int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+ new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
+ "-regions", parallelism});
+ assertEquals(0, errCode);
+ } finally {
+ conf.setInt(NUM_REGION_GROUPS, temp);
+ }
+ }
+
+ /**
+ * Ensure that we can use Set.add to deduplicate WALs
+ */
+ @Test
+ public void setMembershipDedups() throws IOException {
+ final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+ WALFactory wals = null;
+ try {
+ conf.setInt(NUM_REGION_GROUPS, temp*4);
+ // Set HDFS root directory for storing WAL
+ FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
+
+ wals = new WALFactory(conf, null, currentTest.getMethodName());
+ final Set<WAL> seen = new HashSet<WAL>(temp*4);
+ final Random random = new Random();
+ int count = 0;
+ // we know that this should see one of the wals more than once
+ for (int i = 0; i < temp*8; i++) {
+ final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()));
+ LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
+ if (seen.add(maybeNewWAL)) {
+ count++;
+ }
+ }
+ assertEquals("received back a different number of WALs that are not equal() to each other " +
+ "than the bound we placed.", temp*4, count);
+ } finally {
+ if (wals != null) {
+ wals.close();
+ }
+ conf.setInt(NUM_REGION_GROUPS, temp);
+ }
+ }
+}