You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/07 17:56:20 UTC
[43/50] [abbrv] hbase git commit: HBASE-14922 Delayed flush doesn't
work causing flush storms.
HBASE-14922 Delayed flush doesn't work causing flush storms.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd5ddc5f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd5ddc5f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd5ddc5f
Branch: refs/heads/hbase-12439
Commit: cd5ddc5fece7147d55d74117abb272cf8ea08c4d
Parents: c6b8e6f
Author: Elliott Clark <ec...@apache.org>
Authored: Thu Dec 3 10:40:40 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Fri Dec 4 16:54:01 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ChoreService.java | 31 ++++-
.../JitterScheduledThreadPoolExecutorImpl.java | 123 +++++++++++++++++++
.../apache/hadoop/hbase/TestChoreService.java | 10 +-
.../hbase/regionserver/HRegionServer.java | 6 +-
4 files changed, 157 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
index 2519f8f..091d854 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
@@ -87,11 +88,21 @@ public class ChoreService implements ChoreServicer {
private final String coreThreadPoolPrefix;
/**
+ *
* @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
* spawned by this service
*/
+ @VisibleForTesting
public ChoreService(final String coreThreadPoolPrefix) {
- this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
+ this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
+ }
+
+ /**
+ * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
+ * to true this will add -10% to 10% jitter.
+ */
+ public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
+ this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
}
/**
@@ -101,11 +112,19 @@ public class ChoreService implements ChoreServicer {
* to during initialization. The default size is 1, but specifying a larger size may be
* beneficial if you know that 1 thread will not be enough.
*/
- public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
+ public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
this.coreThreadPoolPrefix = coreThreadPoolPrefix;
- if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
+ if (corePoolSize < MIN_CORE_POOL_SIZE) {
+ corePoolSize = MIN_CORE_POOL_SIZE;
+ }
+
final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
- scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+ if (jitter) {
+ scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
+ } else {
+ scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+ }
+
scheduler.setRemoveOnCancelPolicy(true);
scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
@@ -127,7 +146,9 @@ public class ChoreService implements ChoreServicer {
* (typically occurs when a chore is scheduled during shutdown of service)
*/
public synchronized boolean scheduleChore(ScheduledChore chore) {
- if (chore == null) return false;
+ if (chore == null) {
+ return false;
+ }
try {
chore.setChoreServicer(this);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
new file mode 100644
index 0000000..95efa5a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
+ *
+ * This will spread out things on a distributed cluster.
+ */
+public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
+ private final double spread;
+
+ /**
+ * Main constructor.
+ * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
+ */
+ public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
+ ThreadFactory threadFactory,
+ double spread) {
+ super(corePoolSize, threadFactory);
+ this.spread = spread;
+ }
+
+ protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
+ Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) {
+ return new JitteredRunnableScheduledFuture<>(task);
+ }
+
+
+ protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
+ Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) {
+ return new JitteredRunnableScheduledFuture<>(task);
+ }
+
+ /**
+ * Class that basically just defers to the wrapped future.
+ * The only exception is getDelay
+ */
+ protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
+ private final RunnableScheduledFuture<V> wrapped;
+ JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public boolean isPeriodic() {
+ return wrapped.isPeriodic();
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long baseDelay = wrapped.getDelay(unit);
+ long spreadTime = (long) (baseDelay * spread);
+ long delay = baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
+ // Ensure that we don't roll over for nanoseconds.
+ return (delay < 0) ? baseDelay : delay;
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return wrapped.compareTo(o);
+ }
+
+ @Override
+ public void run() {
+ wrapped.run();
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return wrapped.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return wrapped.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return wrapped.isDone();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return wrapped.get();
+ }
+
+ @Override
+ public V get(long timeout,
+ TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return wrapped.get(timeout, unit);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
index b113174..d1a6c19 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
@@ -315,7 +315,7 @@ public class TestChoreService {
final int corePoolSize = 10;
final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
- ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize);
+ ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
try {
assertEquals(corePoolSize, customInit.getCorePoolSize());
} finally {
@@ -329,11 +329,11 @@ public class TestChoreService {
shutdownService(defaultInit);
}
- ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10);
+ ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false);
try {
assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
} finally {
- shutdownService(invalidInit);
+ shutdownService(invalidInit);
}
}
@@ -403,7 +403,7 @@ public class TestChoreService {
@Test (timeout=20000)
public void testCorePoolIncrease() throws InterruptedException {
final int initialCorePoolSize = 3;
- ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize);
+ ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false);
try {
assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize,
@@ -443,7 +443,7 @@ public class TestChoreService {
@Test(timeout = 30000)
public void testCorePoolDecrease() throws InterruptedException {
final int initialCorePoolSize = 3;
- ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize);
+ ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false);
final int chorePeriod = 100;
try {
// Slow chores always miss their start time and thus the core pool size should be at least as
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd5ddc5f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2ce2193..211fed5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
- this.choreService = new ChoreService(getServerName().toString());
+ this.choreService = new ChoreService(getServerName().toString(), true);
if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), new SignalHandler() {
@@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements
static class PeriodicMemstoreFlusher extends ScheduledChore {
final HRegionServer server;
- final static int RANGE_OF_DELAY = 20000; //millisec
- final static int MIN_DELAY_TIME = 3000; //millisec
+ final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
+ final static int MIN_DELAY_TIME = 0; // millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;