You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/11 20:18:27 UTC
svn commit: r1586735 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Fri Apr 11 18:18:26 2014
New Revision: 1586735
URL: http://svn.apache.org/r1586735
Log:
[master] Fix bugs in MemStoreFlusher
Author: daviddeng
Summary:
Use `ScheduledThreadPoolExecutor`
Change some logic
Add `IHRegion` and `IHRegionServer` interfaces.
Add testcase
Test Plan:
`TestMemStoreFlusher`
`TestRegionServerOnlineConfigChange`
`TestPerColumnFamilyFlush.testFlushingWhenLogRolling`
Reviewers: liyintang, gauravm, aaiyer, khadkevich, manukranthk
Reviewed By: manukranthk
CC: hbase-eng@, fan, elliott
Differential Revision: https://phabricator.fb.com/D1264374
Task ID: 4114747
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Apr 11 18:18:26 2014
@@ -1092,6 +1092,18 @@ public final class HConstants {
public static final String SWIFT_CLIENT_SOCKS_PROXY_HOST_AND_PORT =
"hbase.client.swift.socks.proxy.hostAndPort";
+ /**
+ * Key of the number of store-files for blocking flush
+ */
+ public static final String HSTORE_BLOCKING_STORE_FILES_KEY =
+ "hbase.hstore.blockingStoreFiles";
+
+ /**
+ * Key to the maximum wait time in milliseconds for blocking flush
+ */
+ public static final String HSTORE_BLOCKING_WAIT_TIME_KEY =
+ "hbase.hstore.blockingWaitTime";
+ public static final long DEFAULT_HSTORE_BLOCKING_WAIT_TIME = 90000;
private HConstants() {
// Can't be instantiated with this constructor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java Fri Apr 11 18:18:26 2014
@@ -37,5 +37,5 @@ public interface FlushRequester {
* would be flushed.
*
*/
- void request(HRegion region, boolean selectiveFlushRequest);
+ void request(HRegionIf region, boolean selectiveFlushRequest);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Apr 11 18:18:26 2014
@@ -146,7 +146,7 @@ import com.google.common.collect.Lists;
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
-public class HRegion implements HeapSize, ConfigurationObserver {
+public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
public static final Log LOG = LogFactory.getLog(HRegion.class);
static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
@@ -718,10 +718,8 @@ public class HRegion implements HeapSize
}
}
- /**
- * @return True if this region has references.
- */
- boolean hasReferences() {
+ @Override
+ public boolean hasReferences() {
for (Store store : this.stores.values()) {
for (StoreFile sf : store.getStorefiles()) {
// Found a reference, return.
@@ -789,6 +787,7 @@ public class HRegion implements HeapSize
}
/** @return a HRegionInfo object for this region */
+ @Override
public HRegionInfo getRegionInfo() {
return this.regionInfo;
}
@@ -1052,7 +1051,7 @@ public class HRegion implements HeapSize
return this.lastStoreFlushTimeMap.get(store);
}
- /** @return how info about the last flushes <time, size> */
+ @Override
public List<Pair<Long,Long>> getRecentFlushInfo() {
// only MemStoreFlusher thread should be calling this, so read lock is okay
this.splitsAndClosesLock.readLock().lock();
@@ -1344,32 +1343,11 @@ public class HRegion implements HeapSize
* @throws IOException
*/
public boolean flushcache() throws IOException {
- return flushcache(false);
+ return flushMemstoreShapshot(false);
}
- /**
- * Flush the cache.
- *
- * When this method is called the cache will be flushed unless:
- * <ol>
- * <li>the cache is empty</li>
- * <li>the region is closed.</li>
- * <li>a flush is already in progress</li>
- * <li>writes are disabled</li>
- * </ol>
- *
- * <p>This method may block for some time, so it should not be called from a
- * time-sensitive thread.
- *
- * @param selectiveFlushRequest If true, selectively flush column families
- * which dominate the memstore size, provided it
- * is enabled in the configuration.
- *
- * @return true if cache was flushed
- *
- * @throws IOException general io exceptions
- */
- public boolean flushcache(boolean selectiveFlushRequest) throws IOException {
+ @Override
+ public boolean flushMemstoreShapshot(boolean selectiveFlushRequest) throws IOException {
// If a selective flush was requested, but the per-column family switch is
// off, we cannot do a selective flush.
if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) {
@@ -2337,6 +2315,7 @@ public class HRegion implements HeapSize
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
+ @SuppressWarnings("deprecation")
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
throws IOException{
@@ -2914,6 +2893,9 @@ public class HRegion implements HeapSize
return this.stores.get(column);
}
+ /**
+ * @return a map from column family to Store.
+ */
public Map<byte[], Store> getStores() {
return this.stores;
}
@@ -3675,6 +3657,7 @@ public class HRegion implements HeapSize
* @return result
* @throws IOException read exceptions
*/
+ @SuppressWarnings("deprecation")
public Result get(final Get get, final Integer lockid) throws IOException {
// Verify families are all valid
if (get.hasFamilies()) {
@@ -4157,4 +4140,13 @@ public class HRegion implements HeapSize
s.updateConfiguration();
}
}
+
+ @Override
+ public int maxStoreFilesCount() {
+ int res = 0;
+ for (Store hstore : this.stores.values()) {
+ res = Math.max(res, hstore.getStorefilesCount());
+ }
+ return res;
+ }
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,77 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The interface of a storage instance of a whole region.
+ *
+ * The only online server is using HRegion as the implementation.
+ */
+public interface HRegionIf {
+ /**
+ * @return the HRegionInfo of this region
+ */
+ public HRegionInfo getRegionInfo();
+
+ /**
+ * Flushes the cache.
+ *
+ * When this method is called the cache will be flushed unless:
+ * <ol>
+ * <li>the cache is empty</li>
+ * <li>the region is closed.</li>
+ * <li>a flush is already in progress</li>
+ * <li>writes are disabled</li>
+ * </ol>
+ *
+ * <p>
+ * This method may block for some time, so it should not be called from a
+ * time-sensitive thread.
+ *
+ * @param selectiveFlushRequest If true, selectively flush column families
+ * which dominate the memstore size, provided it
+ * is enabled in the configuration.
+ *
+ * @return true if cache was flushed
+ */
+ public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+ throws IOException;
+
+ /**
+ * @return how info about the last flushes <time, size>
+ */
+ public List<Pair<Long, Long>> getRecentFlushInfo();
+
+ /**
+ * @return True if this region has references.
+ */
+ public boolean hasReferences();
+
+ /**
+ * @return the maximum number of files among all stores.
+ */
+ public int maxStoreFilesCount();
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr 11 18:18:26 2014
@@ -98,7 +98,6 @@ import org.apache.hadoop.hbase.UnknownSc
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -120,7 +119,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
-import org.apache.hadoop.hbase.io.hfile.histogram.HistogramUtils;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
@@ -176,7 +174,7 @@ import com.google.common.base.Preconditi
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
-public class HRegionServer implements HRegionInterface,
+public class HRegionServer implements HRegionInterface, HRegionServerIf,
HBaseRPCErrorHandler, Runnable, Watcher, ConfigurationObserver {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
@@ -1322,6 +1320,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public AtomicLong getGlobalMemstoreSize() {
return globalMemstoreSize;
}
@@ -1459,12 +1458,7 @@ public class HRegionServer implements HR
return stop;
}
- /**
- * Checks to see if the file system is still accessible.
- * If not, sets abortRequested and stopRequested
- *
- * @return false if file system is not available
- */
+ @Override
public void checkFileSystem() {
long curtime = EnvironmentEdgeManager.currentTimeMillis();
synchronized (lastCheckFSAt){
@@ -1808,9 +1802,7 @@ public class HRegionServer implements HR
}
}
- /**
- * @return Region server metrics instance.
- */
+ @Override
public RegionServerMetrics getMetrics() {
return this.metrics;
}
@@ -3468,10 +3460,7 @@ public class HRegionServer implements HR
Bytes.mapKey(hr.getRegionInfo().getRegionName()), hr);
}
- /**
- * @return A new Map of online regions sorted by region size with the first
- * entry being the biggest.
- */
+ @Override
public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
// we'll sort the regions in reverse
SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
@@ -4004,7 +3993,7 @@ public class HRegionServer implements HR
return null;
}
- /** @return what the regionserver thread name should be */
+ @Override
public String getRSThreadName() {
return "RS-" + serverInfo.getServerName();
}
@@ -4118,10 +4107,20 @@ public class HRegionServer implements HR
@Override
public HRegionLocation getLocation(byte[] table, byte[] row, boolean reload)
- throws IOException {
+ throws IOException {
if (reload) {
return regionServerConnection.relocateRegion(new StringBytes(table), row);
}
return regionServerConnection.locateRegion(new StringBytes(table), row);
}
+
+ @Override
+ public boolean requestSplit(HRegionIf r) {
+ return this.compactSplitThread.requestSplit((HRegion) r);
+ }
+
+ @Override
+ public void requestCompaction(HRegionIf r, String why) {
+ this.compactSplitThread.requestCompaction((HRegion) r, why);
+ }
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
+
+/**
+ * The interface representing a region server.
+ * A regions server serves a set of HRegions available to clients.
+ *
+ * The only online server is using HRegionServer as the implementation.
+ */
+public interface HRegionServerIf {
+ /**
+ * @return what the regionserver thread name should be
+ */
+ public String getRSThreadName();
+
+ /**
+ * Checks to see if the file system is still accessible.
+ * If not, sets abortRequested and stopRequested
+ *
+ * @return false if file system is not available
+ */
+ public void checkFileSystem();
+
+ /**
+ * Requests the region server to make a split on a specific region-store.
+ */
+ public boolean requestSplit(HRegionIf r);
+
+ /**
+ * Requests the region server to make a compaction on a specific region-store.
+ *
+ * @param r the region-store.
+ * @param why Why compaction requested -- used in debug messages
+ */
+ public void requestCompaction(HRegionIf r, String why);
+
+ /**
+ * @return Region server metrics instance.
+ */
+ public RegionServerMetrics getMetrics();
+
+ /**
+ * @return the size of global mem-store in bytes as an AtomicLong.
+ */
+ public AtomicLong getGlobalMemstoreSize();
+
+ /**
+ * @return A new SortedMap of online regions sorted by region size with the
+ * first entry being the biggest.
+ */
+ public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize();
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr 11 18:18:26 2014
@@ -19,30 +19,29 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.DaemonThreadFactory;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.util.StringUtils;
-
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.SortedMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Future;
-import java.util.concurrent.Delayed;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
/**
* Thread that flushes cache on request
@@ -55,14 +54,10 @@ import java.util.concurrent.locks.Reentr
*/
class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
- // These two data members go together. Any entry in the one must have
- // a corresponding entry in the other.
- private final Map<HRegion, FlushQueueEntry> regionsInQueue =
- new HashMap<HRegion, FlushQueueEntry>();
-
- private final boolean perColumnFamilyFlushEnabled;
- private final HRegionServer server;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Map<HRegionIf, Pair<FlushQueueEntry, Future<Boolean>>>
+ regionsInQueue = new HashMap<>();
+
+ private final HRegionServerIf server;
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
@@ -73,20 +68,18 @@ class MemStoreFlusher implements FlushRe
"hbase.regionserver.global.memstore.upperLimit";
private static final String LOWER_KEY =
"hbase.regionserver.global.memstore.lowerLimit";
- private long blockingStoreFilesNumber;
+
+ private int blockingStoreFilesNumber;
private long blockingWaitTime;
private int handlerCount;
- private final ThreadPoolExecutor flushes;
- Map<FlushQueueEntry, Future> futures = new HashMap<FlushQueueEntry, Future>();
+ private final ScheduledThreadPoolExecutor threadPool;
/**
* @param conf
* @param server
*/
- public MemStoreFlusher(final Configuration conf,
- final HRegionServer server) {
- super();
+ public MemStoreFlusher(final Configuration conf, HRegionServerIf server) {
this.server = server;
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
@@ -99,17 +92,15 @@ class MemStoreFlusher implements FlushRe
}
this.globalMemStoreLimitLowMark = lower;
this.blockingStoreFilesNumber =
- conf.getInt("hbase.hstore.blockingStoreFiles", -1);
+ conf.getInt(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, -1);
if (this.blockingStoreFilesNumber == -1) {
this.blockingStoreFilesNumber = 1 +
conf.getInt("hbase.hstore.compactionThreshold", 3);
}
- this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
- 90000);
+ this.blockingWaitTime =
+ conf.getLong(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY,
+ HConstants.DEFAULT_HSTORE_BLOCKING_WAIT_TIME);
- this.perColumnFamilyFlushEnabled = conf.getBoolean(
- HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
- HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
// number of "memstore flusher" threads per region server
this.handlerCount = conf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
@@ -118,9 +109,9 @@ class MemStoreFlusher implements FlushRe
", globalMemStoreLimitLowMark=" +
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
", maxHeap=" + StringUtils.humanReadableInt(max));
- this.flushes = new ThreadPoolExecutor(handlerCount, handlerCount,
- 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ this.threadPool = new ScheduledThreadPoolExecutor(handlerCount,
new DaemonThreadFactory("flush-thread-"));
+ this.threadPool.setMaximumPoolSize(handlerCount);
}
/**
@@ -147,58 +138,70 @@ class MemStoreFlusher implements FlushRe
return (long)(max * limit);
}
- public void request(HRegion r, boolean isSelective) {
+ @Override
+ public void request(HRegionIf r, boolean isSelective) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushQueueEntry fqe = new FlushQueueEntry(r, isSelective);
- this.regionsInQueue.put(r, fqe);
- executeFlushQueueEntry(fqe);
+ regionsInQueue.put(r, Pair.newPair(fqe, (Future<Boolean>) null));
+ executeFlushQueueEntry(fqe, 0);
+ } else {
+ LOG.info("Flush for " + r + " already scheduled.");
}
}
}
- protected void executeFlushQueueEntry(final FlushQueueEntry fqe) {
- Runnable runnable = new Runnable() {
+ /**
+ * Called synchronized with regionsInQueue
+ */
+ protected void executeFlushQueueEntry(final FlushQueueEntry fqe, long msDelay) {
+ Callable<Boolean> callable = new Callable<Boolean>() {
@Override
- public void run() {
- try {
- String name = String.format("%s.cacheFlusher.%d", MemStoreFlusher.this.server.getRSThreadName(),
- MemStoreFlusher.this.flushes.getCorePoolSize() + 1);
- if (!flushRegion(fqe, name)) {
- LOG.warn("Failed to flush " + fqe.region);
- }
- } catch (Exception ex) {
- LOG.error("Cache flush failed" +
- (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
- ex
- );
- server.checkFileSystem();
+ public Boolean call() throws Exception {
+ String name =
+ String.format("%s.cacheFlusher.%d",
+ MemStoreFlusher.this.server.getRSThreadName(),
+ Thread.currentThread().getId());
+ if (!flushRegion(fqe, name)) {
+ LOG.warn("Failed to flush " + fqe.region);
+ return false;
}
+ return true;
}
};
- futures.put(fqe, this.flushes.submit(runnable));
+
+ LOG.debug("Schedule a flush request " + fqe + " with delay " + msDelay
+ + "ms");
+
+ Future<Boolean> future =
+ this.threadPool.schedule(callable, msDelay, TimeUnit.MILLISECONDS);
+ Pair<FlushQueueEntry, Future<Boolean>> pair =
+ regionsInQueue.get(fqe.region);
+ if (pair != null) {
+ pair.setSecond(future);
+ }
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
- flushes.shutdown();
+ threadPool.shutdown();
}
boolean isAlive() {
- return !flushes.isShutdown();
+ return !threadPool.isShutdown();
}
void join() {
boolean done = false;
while (!done) {
try {
- done = flushes.awaitTermination(60, TimeUnit.SECONDS);
LOG.debug("Waiting for flush thread to finish...");
+ done = threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
LOG.error("Interrupted waiting for flush thread to finish...");
}
@@ -209,25 +212,26 @@ class MemStoreFlusher implements FlushRe
* A flushRegion that checks store file count. If too many, puts the flush
* on delay queue to retry later.
* @param fqe
- * @return true if the region was successfully flushed, false otherwise. If
+ * @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
private boolean flushRegion(final FlushQueueEntry fqe, String why) {
- HRegion region = fqe.region;
+ HRegionIf region = fqe.region;
if (!fqe.region.getRegionInfo().isMetaRegion() &&
- isTooManyStoreFiles(region)) {
+ region.maxStoreFilesCount() > this.blockingStoreFilesNumber) {
if (fqe.isMaximumWait(this.blockingWaitTime)) {
- LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
- "ms on a compaction to clean up 'too many store files'; waited " +
- "long enough... proceeding with flush of " +
- region.getRegionNameAsString());
+ LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime)
+ + "ms on a compaction to clean up 'too many store files'; waited "
+ + "long enough... proceeding with flush of "
+ + region.getRegionInfo().getRegionNameAsString());
} else {
// If this is first time we've been put off, then emit a log message.
if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions
- LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
- "store files; delaying flush up to " + this.blockingWaitTime + "ms");
+ LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString()
+ + " has too many store files; delaying flush up to "
+ + this.blockingWaitTime + "ms");
}
/* If a split has been requested, we avoid scheduling a compaction
@@ -237,27 +241,32 @@ class MemStoreFlusher implements FlushRe
* references to parent regions are removed, and we can split this
* region further.
*/
- if (!this.server.compactSplitThread.requestSplit(region)
- || region.hasReferences()) {
- this.server.compactSplitThread.requestCompaction(region, why);
+ if (!this.server.requestSplit(region) || region.hasReferences()) {
+ this.server.requestCompaction(region, why);
}
- // Put back on the queue. Have it come back out of the queue
- // after a delay of this.blockingWaitTime / 100 ms.
- executeFlushQueueEntry(fqe.requeue(this.blockingWaitTime / 100));
- // Tell a lie, it's not flushed but it's ok
+
+ synchronized (this.regionsInQueue) {
+ // Put back on the queue. Have it come back out of the queue
+ // after a delay of this.blockingWaitTime / 100 ms.
+ executeFlushQueueEntry(fqe.requeue(), this.blockingWaitTime / 100);
+ }
+ // Tell a lie, it's not flushed but it's OK
return true;
}
}
- return flushRegion(region, why, false, fqe.isSelectiveFlushRequest());
+ try {
+ return flushRegionNow(region, why, fqe.selective());
+ } finally {
+ // the task is executed, remove from regionsInQueue
+ synchronized (this.regionsInQueue) {
+ this.regionsInQueue.remove(region);
+ }
+ }
}
/**
* Flush a region.
* @param region Region to flush.
- * @param emergencyFlush Set if we are being force flushed. If true the region
- * needs to be removed from the flush queue. If false, when we were called
- * from the main flusher run loop and we got the entry to flush by calling
- * poll on the flush queue (which removed it).
* @param selectiveFlushRequest Do we want to selectively flush only the
* column families that dominate the memstore size?
*
@@ -265,52 +274,25 @@ class MemStoreFlusher implements FlushRe
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
- private boolean flushRegion(final HRegion region, String why,
- final boolean emergencyFlush, boolean selectiveFlushRequest) {
-
- synchronized (this.regionsInQueue) {
- FlushQueueEntry fqe = this.regionsInQueue.remove(region);
- if (fqe != null && emergencyFlush) {
- Future future = futures.get(fqe);
- if (future != null) {
- try {
- future.get();
- if (region.flushcache(selectiveFlushRequest)) {
- server.compactSplitThread.requestCompaction(region, why);
- }
- server.getMetrics().addFlush(region.getRecentFlushInfo());
- } catch (IOException ex) {
- LOG.warn("Cache flush failed" +
- (region != null ? (" for region " +
- Bytes.toString(region.getRegionName())) : ""),
- RemoteExceptionHandler.checkIOException(ex)
- );
- server.checkFileSystem();
- return false;
- } catch (InterruptedException e) {
- LOG.warn("Flush failed" +
- (region != null ? (" for region " +
- Bytes.toString(region.getRegionName())) : ""));
- } catch (ExecutionException e) {
- LOG.warn("Flush failed" +
- (region != null ? (" for region " +
- Bytes.toString(region.getRegionName())) : ""));
- } finally {
- lock.readLock().unlock();
- }
- }
+ private boolean flushRegionNow(HRegionIf region, String why,
+ boolean selectiveFlushRequest) {
+ try {
+ boolean res = region.flushMemstoreShapshot(selectiveFlushRequest);
+ if (res) {
+ server.requestCompaction(region, why);
}
+ server.getMetrics().addFlush(region.getRecentFlushInfo());
+ return res;
+ } catch (IOException ex) {
+ LOG.warn(
+ "Cache flush failed"
+ + (region != null
+ ? (" for region " + region.getRegionInfo().getRegionNameAsString())
+ : ""),
+ RemoteExceptionHandler.checkIOException(ex));
+ server.checkFileSystem();
+ return false;
}
- return true;
- }
-
- private boolean isTooManyStoreFiles(HRegion region) {
- for (Store hstore: region.stores.values()) {
- if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
- return true;
- }
- }
- return false;
}
/**
@@ -325,6 +307,39 @@ class MemStoreFlusher implements FlushRe
}
}
+ /**
+ * Makes an emergency flush.
+ *
+ * If a flush request is found in the queue, these method will wait for that
+ * flush to be finished. Otherwise a flush will be performed in the current
+ * thread by calling to {@code #flushRegionNow(IRegion, String, boolean)}
+ */
+ private boolean doEmergencyFlush(HRegionIf region, String why,
+ boolean selectiveFlushRequest) {
+ Pair<FlushQueueEntry, Future<Boolean>> pair;
+ synchronized (regionsInQueue) {
+ pair = regionsInQueue.get(region);
+ }
+ if (pair != null) {
+ // Already has flush request, wait for its finish.
+ try {
+ return pair.getSecond().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Interrupted waiting for flushing of " + region, e);
+ return false;
+ } catch (ExecutionException e) {
+ // This should not happen actually, all exception should have been
+ // caught in Callable.
+ LOG.info("ExecutionException caught for flushing of " + region, e);
+ return false;
+ }
+ }
+
+ // Perform a flush in current thread
+ return flushRegionNow(region, why, selectiveFlushRequest);
+ }
+
/*
* Emergency! Need to flush memory.
*/
@@ -332,14 +347,17 @@ class MemStoreFlusher implements FlushRe
if (this.server.getGlobalMemstoreSize().get() < globalMemStoreLimit) {
return; // double check the global memstore size inside of the synchronized block.
}
-
+
// keep flushing until we hit the low water mark
long globalMemStoreSize = -1;
ArrayList<HRegion> regionsToCompact = new ArrayList<HRegion>();
- for (SortedMap<Long, HRegion> m =
+ SortedMap<Long, HRegion> m =
this.server.getCopyOfOnlineRegionsSortedBySize();
- (globalMemStoreSize = this.server.getGlobalMemstoreSize().get()) >=
- this.globalMemStoreLimitLowMark;) {
+ while (true) {
+ globalMemStoreSize = this.server.getGlobalMemstoreSize().get();
+ if (globalMemStoreSize < this.globalMemStoreLimitLowMark) {
+ break;
+ }
// flush the region with the biggest memstore
if (m.size() <= 0) {
LOG.info("No online regions to flush though we've been asked flush " +
@@ -356,52 +374,50 @@ class MemStoreFlusher implements FlushRe
" exceeded; currently " +
StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
- if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true, false)) {
+ if (!doEmergencyFlush(biggestMemStoreRegion, "emergencyFlush", false)) {
LOG.warn("Flush failed");
break;
}
regionsToCompact.add(biggestMemStoreRegion);
}
for (HRegion region : regionsToCompact) {
- server.compactSplitThread.requestCompaction(region, "emergencyFlush");
+ server.requestCompaction(region, "emergencyFlush");
}
}
/**
- * Datastructure used in the flush queue. Holds region and retry count.
- * Keeps tabs on how old this object is. Implements {@link Delayed}. On
+ * Data structure used in the flush queue. Holds region and retry count.
+ * Keeps tabs on how old this object is. Implements {@link Delayed}. On
* construction, the delay is zero. When added to a delay queue, we'll come
- * out near immediately. Call {@link #requeue(long)} passing delay in
- * milliseconds before readding to delay queue if you want it to stay there
+ * out near immediately. Call {@link #requeue(long)} passing delay in
+ * milliseconds before reading to delay queue if you want it to stay there
* a while.
*/
- static class FlushQueueEntry implements Delayed {
- private final HRegion region;
+ static class FlushQueueEntry {
+ private final HRegionIf region;
private final long createTime;
- private long whenToExpire;
private int requeueCount = 0;
- private boolean selectiveFlushRequest;
+ private boolean selective;
/**
* @param r The region to flush
- * @param selectiveFlushRequest Do we want to flush only the column
+ * @param selective Do we want to flush only the column
* families that dominate the memstore size,
* i.e., do a selective flush? If we are
* doing log rolling, then we should not do a
* selective flush.
*/
- FlushQueueEntry(final HRegion r, boolean selectiveFlushRequest) {
+ FlushQueueEntry(final HRegionIf r, boolean selective) {
this.region = r;
this.createTime = System.currentTimeMillis();
- this.whenToExpire = this.createTime;
- this.selectiveFlushRequest = selectiveFlushRequest;
+ this.selective = selective;
}
/**
* @return Is this a request for a selective flush?
*/
- public boolean isSelectiveFlushRequest() {
- return selectiveFlushRequest;
+ public boolean selective() {
+ return selective;
}
/**
@@ -419,29 +435,21 @@ class MemStoreFlusher implements FlushRe
public int getRequeueCount() {
return this.requeueCount;
}
-
+
/**
- * @param when When to expire, when to come up out of the queue.
- * Specify in milliseconds. This method adds System.currentTimeMillis()
- * to whatever you pass.
- * @return This.
+ * Increases the requeue count.
+ *
+ * @return this.
*/
- public FlushQueueEntry requeue(final long when) {
- this.whenToExpire = System.currentTimeMillis() + when;
+ public FlushQueueEntry requeue() {
this.requeueCount++;
return this;
}
@Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.whenToExpire - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- }
-
- @Override
- public int compareTo(Delayed other) {
- return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
- other.getDelay(TimeUnit.MILLISECONDS)).intValue();
+ public String toString() {
+ return "{regin: " + region + ", created: " + createTime + ", requeue: "
+ + requeueCount + ", selective: " + selective + "}";
}
}
@@ -451,12 +459,11 @@ class MemStoreFlusher implements FlushRe
// number of "memstore flusher" threads per region server
int handlerCount = newConf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
if(this.handlerCount != handlerCount){
- LOG.info("Changing the value of " + HConstants.FLUSH_THREADS +
- " from " + this.handlerCount + " to " +
- handlerCount);
+ LOG.info("Changing the value of " + HConstants.FLUSH_THREADS + " from "
+ + this.handlerCount + " to " + handlerCount);
}
- this.flushes.setMaximumPoolSize(handlerCount);
- this.flushes.setCorePoolSize(handlerCount);
+ this.threadPool.setMaximumPoolSize(handlerCount);
+ this.threadPool.setCorePoolSize(handlerCount);
this.handlerCount = handlerCount;
}
@@ -467,7 +474,35 @@ class MemStoreFlusher implements FlushRe
* @return
*/
protected int getFlushThreadNum() {
- return this.flushes.getCorePoolSize();
+ return this.threadPool.getCorePoolSize();
}
+ /**
+ * Waits for all current request to be done.
+ * Used only in testcases.
+ */
+ void waitAllRequestDone() throws ExecutionException, InterruptedException {
+ while (true) {
+ // Fetch futures.
+ List<Future<Boolean>> futures = new ArrayList<>();
+ synchronized (this.regionsInQueue) {
+ for (Pair<FlushQueueEntry, Future<Boolean>> pair :
+ regionsInQueue.values()) {
+ futures.add(pair.getSecond());
+ }
+ }
+
+ if (futures.size() == 0) {
+ // No more requests, quit
+ return;
+ }
+
+ // Wait for futures
+ for (Future<Boolean> future : futures) {
+ future.get();
+ }
+ // This is a loop because some new requests may be generated during
+ // executing current requests.
+ }
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java Fri Apr 11 18:18:26 2014
@@ -164,7 +164,7 @@ public class TestPerColumnFamilyFlush ex
(cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
// Flush!
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
// Will use these to check if anything changed.
long oldCF2MemstoreSize = cf2MemstoreSize;
@@ -205,7 +205,7 @@ public class TestPerColumnFamilyFlush ex
oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
// Flush again
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
// Recalculate everything
cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
@@ -228,7 +228,7 @@ public class TestPerColumnFamilyFlush ex
// In that case, we should flush all the CFs.
// Clearing the existing memstores.
- region.flushcache(false);
+ region.flushMemstoreShapshot(false);
// The memstore limit is 200*1024 and the column family flush threshold is
// around 50*1024. We try to just hit the memstore limit with each CF's
@@ -241,7 +241,7 @@ public class TestPerColumnFamilyFlush ex
region.put(createPut(5, i));
}
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
// Since we won't find any CF above the threshold, and hence no specific
// store to flush, we should flush all the memstores.
Assert.assertEquals(0, region.getMemstoreSize().get());
@@ -254,7 +254,7 @@ public class TestPerColumnFamilyFlush ex
* FM1 FM2 FM3
*/
// flush all
- region.flushcache(false);
+ region.flushMemstoreShapshot(false);
region.put(createPut(2, 1));
region.put(createPut(3, 1));
@@ -279,7 +279,7 @@ public class TestPerColumnFamilyFlush ex
* FM1 FM2 FM3
*/
// flush, should only flush family 1
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
final long sizeFamily1_2 = region.getStore(FAMILY1).getMemStoreSize();
final long sizeFamily2_2 = region.getStore(FAMILY2).getMemStoreSize();
@@ -295,7 +295,7 @@ public class TestPerColumnFamilyFlush ex
* FM1 FM2 FM3
*/
// flush all
- region.flushcache(false);
+ region.flushMemstoreShapshot(false);
region.put(createPut(2, 1));
region.put(createPut(3, 1));
@@ -318,7 +318,7 @@ public class TestPerColumnFamilyFlush ex
* FM1 FM2 FM3
*/
// flush, should flush all
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
final long sizeFamily1_4 = region.getStore(FAMILY1).getMemStoreSize();
final long sizeFamily2_4 = region.getStore(FAMILY2).getMemStoreSize();
@@ -417,7 +417,7 @@ public class TestPerColumnFamilyFlush ex
(cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
// Flush!
- region.flushcache(true);
+ region.flushMemstoreShapshot(true);
cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
@@ -501,7 +501,7 @@ public class TestPerColumnFamilyFlush ex
desiredRegion != null);
// Flush the region selectively.
- desiredRegion.flushcache(true);
+ desiredRegion.flushMemstoreShapshot(true);
long totalMemstoreSize;
long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StringBytes;
+import org.junit.Test;
+
+/**
+ * Testcases for MemStoreFlusher
+ */
+public class TestMemStoreFlusher {
+ private static class HRegionServerMock implements HRegionServerIf {
+ RegionServerMetrics metrics;
+ public HRegionServerMock(Configuration conf) {
+ metrics = new RegionServerMetrics(conf);
+ }
+
+ AtomicLong globalMemstoreSize = new AtomicLong(0);
+ @Override
+ public String getRSThreadName() {
+ return "RSThread";
+ }
+
+ @Override
+ public void checkFileSystem() {
+ }
+
+ @Override
+ public boolean requestSplit(HRegionIf r) {
+ return false;
+ }
+
+ @Override
+ public RegionServerMetrics getMetrics() {
+ return metrics;
+ }
+
+ @Override
+ public AtomicLong getGlobalMemstoreSize() {
+ return globalMemstoreSize;
+ }
+
+ @Override
+ public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
+ return null;
+ }
+
+ @Override
+ public void requestCompaction(HRegionIf r, String why) {
+ }
+
+ }
+
+ private static class HRegionMock implements HRegionIf {
+ private final HRegionInfo info;
+ final AtomicInteger flushedCount = new AtomicInteger();
+ /**
+ * The number of checking maxStoreFilesCount times
+ */
+ final AtomicInteger checkingCount = new AtomicInteger();
+ final int maxStoreFilesCount;
+
+ public HRegionMock(StringBytes tableName, int maxStoreFilesCount) {
+ info = new HRegionInfo(new HTableDescriptor(tableName.getBytes()),
+ null, null);
+ this.maxStoreFilesCount = maxStoreFilesCount;
+ }
+
+ @Override
+ public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+ throws IOException {
+ flushedCount.addAndGet(1);
+ return true;
+ }
+
+ @Override
+ public List<Pair<Long, Long>> getRecentFlushInfo() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return info;
+ }
+
+ @Override
+ public boolean hasReferences() {
+ return false;
+ }
+
+ @Override
+ public int maxStoreFilesCount() {
+ checkingCount.addAndGet(1);
+ return maxStoreFilesCount;
+ }
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ final Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1");
+
+ HRegionServerMock server = new HRegionServerMock(conf);
+ MemStoreFlusher flusher = new MemStoreFlusher(conf, server);
+
+ HRegionMock region = new HRegionMock(new StringBytes("testFlush"), 1);
+
+ // Make a flush request
+ flusher.request(region, false);
+
+ flusher.waitAllRequestDone();
+ Assert.assertEquals("Number of flushing", 1, region.flushedCount.get());
+ }
+
+ /**
+ * In MemStoreFlusher, if there are too many store files, a delay should be
+ * performed waiting for some files to be merged. This case assure this delay
+ * is performed.
+ */
+ @Test
+ public void testDelay() throws Exception {
+ final long blockingWaitTime = 2000;
+
+ final Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1");
+ conf.set(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY, "" + blockingWaitTime);
+
+ HRegionServerMock server = new HRegionServerMock(conf);
+ MemStoreFlusher flusher = new MemStoreFlusher(conf, server);
+
+ // 3 is larger than 1
+ HRegionMock region = new HRegionMock(new StringBytes("testDelay"), 3);
+
+ // Make a flush request
+ flusher.request(region, false);
+
+ flusher.waitAllRequestDone();
+
+ Assert.assertEquals("Number of flushing", 1, region.flushedCount.get());
+ Assert.assertTrue("Number of checking should <= " + 100 + ", but got "
+ + region.checkingCount.get(), region.checkingCount.get() <= 100);
+ }
+}