You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/01/20 08:11:09 UTC
[hbase] branch master updated: HBASE-25509 ChoreService.cancelChore will not call ScheduledChore.cle… (#2890)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new a37e727 HBASE-25509 ChoreService.cancelChore will not call ScheduledChore.cle… (#2890)
a37e727 is described below
commit a37e72799015ce2a0d6ce98566149a17b31c99ea
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Jan 20 16:10:36 2021 +0800
HBASE-25509 ChoreService.cancelChore will not call ScheduledChore.cle… (#2890)
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../java/org/apache/hadoop/hbase/ChoreService.java | 135 ++-
.../org/apache/hadoop/hbase/ScheduledChore.java | 103 +-
.../org/apache/hadoop/hbase/TestChoreService.java | 1076 +++++++++-----------
.../org/apache/hadoop/hbase/master/HMaster.java | 40 +-
.../hadoop/hbase/master/RegionsRecoveryChore.java | 22 -
.../hbase/master/RegionsRecoveryConfigManager.java | 45 +-
.../apache/hadoop/hbase/master/ServerManager.java | 2 +-
.../hadoop/hbase/master/SplitLogManager.java | 2 +-
.../org/apache/hadoop/hbase/quotas/QuotaCache.java | 2 +-
.../quotas/RegionServerSpaceQuotaManager.java | 4 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 24 +-
.../hbase/regionserver/HeapMemoryManager.java | 2 +-
.../master/TestRegionsRecoveryConfigManager.java | 58 +-
.../hbase/master/janitor/TestCatalogJanitor.java | 2 +-
14 files changed, 718 insertions(+), 799 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
index 39c3ccc..5bd67ad 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase;
+import com.google.errorprone.annotations.RestrictedApi;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -26,8 +27,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
* Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
*/
@InterfaceAudience.Public
-public class ChoreService implements ChoreServicer {
+public class ChoreService {
private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
/**
@@ -141,28 +140,39 @@ public class ChoreService implements ChoreServicer {
* @return true when the chore was successfully scheduled. false when the scheduling failed
* (typically occurs when a chore is scheduled during shutdown of service)
*/
- public synchronized boolean scheduleChore(ScheduledChore chore) {
+ public boolean scheduleChore(ScheduledChore chore) {
if (chore == null) {
return false;
}
-
- try {
- if (chore.getPeriod() <= 0) {
- LOG.info("Chore {} is disabled because its period is not positive.", chore);
- return false;
- }
- LOG.info("Chore {} is enabled.", chore);
- chore.setChoreServicer(this);
- ScheduledFuture<?> future =
- scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
- chore.getTimeUnit());
- scheduledChores.put(chore, future);
- return true;
- } catch (Exception exception) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Could not successfully schedule chore: " + chore.getName());
+ // always lock chore first to prevent dead lock
+ synchronized (chore) {
+ synchronized (this) {
+ try {
+ // Chores should only ever be scheduled with a single ChoreService. If the choreService
+ // is changing, cancel any existing schedules of this chore.
+ if (chore.getChoreService() == this) {
+ LOG.warn("Chore {} has already been scheduled with us", chore);
+ return false;
+ }
+ if (chore.getPeriod() <= 0) {
+ LOG.info("Chore {} is disabled because its period is not positive.", chore);
+ return false;
+ }
+ LOG.info("Chore {} is enabled.", chore);
+ if (chore.getChoreService() != null) {
+ LOG.info("Cancel chore {} from its previous service", chore);
+ chore.getChoreService().cancelChore(chore);
+ }
+ chore.setChoreService(this);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
+ chore.getPeriod(), chore.getTimeUnit());
+ scheduledChores.put(chore, future);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Could not successfully schedule chore: {}", chore.getName(), e);
+ return false;
+ }
}
- return false;
}
}
@@ -175,19 +185,35 @@ public class ChoreService implements ChoreServicer {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(false);
}
- scheduleChore(chore);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
+ chore.getPeriod(), chore.getTimeUnit());
+ scheduledChores.put(chore, future);
}
- @InterfaceAudience.Private
- @Override
- public synchronized void cancelChore(ScheduledChore chore) {
+ /**
+ * Cancel any ongoing schedules that this chore has with the implementer of this interface.
+ * <p/>
+ * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in
+ * {@link ScheduledChore#cancel()} method we will call this method to remove the
+ * {@link ScheduledChore} from this {@link ChoreService}.
+ */
+ @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
+ allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
+ synchronized void cancelChore(ScheduledChore chore) {
cancelChore(chore, true);
}
- @InterfaceAudience.Private
- @Override
- public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
- if (chore != null && scheduledChores.containsKey(chore)) {
+ /**
+ * Cancel any ongoing schedules that this chore has with the implementer of this interface.
+ * <p/>
+ * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in
+ * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the
+ * {@link ScheduledChore} from this {@link ChoreService}.
+ */
+ @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
+ allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
+ synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
+ if (scheduledChores.containsKey(chore)) {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(mayInterruptIfRunning);
scheduledChores.remove(chore);
@@ -201,21 +227,24 @@ public class ChoreService implements ChoreServicer {
}
}
+ /**
+ * @return true when the chore is scheduled with the implementer of this interface
+ */
@InterfaceAudience.Private
- @Override
public synchronized boolean isChoreScheduled(ScheduledChore chore) {
return chore != null && scheduledChores.containsKey(chore)
&& !scheduledChores.get(chore).isDone();
}
- @InterfaceAudience.Private
- @Override
- public synchronized boolean triggerNow(ScheduledChore chore) {
- if (chore != null) {
- rescheduleChore(chore);
- return true;
- }
- return false;
+ /**
+ * This method tries to execute the chore immediately. If the chore is executing at the time of
+ * this call, the chore will begin another execution as soon as the current execution finishes
+ */
+ @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
+ allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
+ synchronized void triggerNow(ScheduledChore chore) {
+ assert chore.getChoreService() == this;
+ rescheduleChore(chore);
}
/**
@@ -295,10 +324,20 @@ public class ChoreService implements ChoreServicer {
}
}
- @InterfaceAudience.Private
- @Override
- public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
- if (chore == null || !scheduledChores.containsKey(chore)) return;
+ /**
+ * A callback that tells the implementer of this interface that one of the scheduled chores is
+ * missing its start time. The implication of a chore missing its start time is that the service's
+ * current means of scheduling may not be sufficient to handle the number of ongoing chores (the
+ * other explanation is that the chore's execution time is greater than its scheduled period). The
+ * service should try to increase its concurrency when this callback is received.
+ * @param chore The chore that missed its start time
+ */
+ @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
+ allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
+ synchronized void onChoreMissedStartTime(ScheduledChore chore) {
+ if (!scheduledChores.containsKey(chore)) {
+ return;
+ }
// If the chore has not caused an increase in the size of the core thread pool then request an
// increase. This allows each chore missing its start time to increase the core pool size by
@@ -319,13 +358,17 @@ public class ChoreService implements ChoreServicer {
* shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
* in the middle of execution will be interrupted and shutdown. This service will be unusable
* after this method has been called (i.e. future scheduling attempts will fail).
+ * <p/>
+ * Notice that, this will only clean the chore from this ChoreService but you could still schedule
+ * the chore with other ChoreService.
*/
public synchronized void shutdown() {
- scheduler.shutdownNow();
- if (LOG.isInfoEnabled()) {
- LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
- + " on shutdown");
+ if (isShutdown()) {
+ return;
}
+ scheduler.shutdownNow();
+ LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix,
+ scheduledChores.keySet());
cancelAllChores(true);
scheduledChores.clear();
choresMissingStartTime.clear();
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
index 1fb5b7e..6155bbd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase;
+import com.google.errorprone.annotations.RestrictedApi;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
* execute within the defined period. It is bad practice to define a ScheduledChore whose execution
* time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
* thread pool.
- * <p>
+ * <p/>
* Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
* an entry being added to a queue, etc.
*/
@@ -60,7 +61,7 @@ public abstract class ScheduledChore implements Runnable {
* Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
* not scheduled.
*/
- private ChoreServicer choreServicer;
+ private ChoreService choreService;
/**
* Variables that encapsulate the meaningful state information
@@ -77,39 +78,6 @@ public abstract class ScheduledChore implements Runnable {
*/
private final Stoppable stopper;
- interface ChoreServicer {
- /**
- * Cancel any ongoing schedules that this chore has with the implementer of this interface.
- */
- public void cancelChore(ScheduledChore chore);
- public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
-
- /**
- * @return true when the chore is scheduled with the implementer of this interface
- */
- public boolean isChoreScheduled(ScheduledChore chore);
-
- /**
- * This method tries to execute the chore immediately. If the chore is executing at the time of
- * this call, the chore will begin another execution as soon as the current execution finishes
- * <p>
- * If the chore is not scheduled with a ChoreService, this call will fail.
- * @return false when the chore could not be triggered immediately
- */
- public boolean triggerNow(ScheduledChore chore);
-
- /**
- * A callback that tells the implementer of this interface that one of the scheduled chores is
- * missing its start time. The implication of a chore missing its start time is that the
- * service's current means of scheduling may not be sufficient to handle the number of ongoing
- * chores (the other explanation is that the chore's execution time is greater than its
- * scheduled period). The service should try to increase its concurrency when this callback is
- * received.
- * @param chore The chore that missed its start time
- */
- public void onChoreMissedStartTime(ScheduledChore chore);
- }
-
/**
* This constructor is for test only. It allows us to create an object and to call chore() on it.
*/
@@ -168,8 +136,8 @@ public abstract class ScheduledChore implements Runnable {
onChoreMissedStartTime();
LOG.info("Chore: {} missed its start time", getName());
} else if (stopper.isStopped() || !isScheduled()) {
- cancel(false);
- cleanup();
+ // call shutdown here to cleanup the ScheduledChore.
+ shutdown(false);
LOG.info("Chore: {} was stopped", getName());
} else {
try {
@@ -193,7 +161,6 @@ public abstract class ScheduledChore implements Runnable {
LOG.error("Caught error", t);
if (this.stopper.isStopped()) {
cancel(false);
- cleanup();
}
}
}
@@ -214,7 +181,9 @@ public abstract class ScheduledChore implements Runnable {
* pool threads
*/
private synchronized void onChoreMissedStartTime() {
- if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
+ if (choreService != null) {
+ choreService.onChoreMissedStartTime(this);
+ }
}
/**
@@ -253,20 +222,17 @@ public abstract class ScheduledChore implements Runnable {
* @return false when the Chore is not currently scheduled with a ChoreService
*/
public synchronized boolean triggerNow() {
- if (choreServicer != null) {
- return choreServicer.triggerNow(this);
- } else {
+ if (choreService == null) {
return false;
}
+ choreService.triggerNow(this);
+ return true;
}
- synchronized void setChoreServicer(ChoreServicer service) {
- // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
- // is changing, cancel any existing schedules of this chore.
- if (choreServicer != null && choreServicer != service) {
- choreServicer.cancelChore(this, false);
- }
- choreServicer = service;
+ @RestrictedApi(explanation = "Should only be called in ChoreService", link = "",
+ allowedOnPath = ".*/org/apache/hadoop/hbase/ChoreService.java")
+ synchronized void setChoreService(ChoreService service) {
+ choreService = service;
timeOfThisRun = -1;
}
@@ -275,9 +241,10 @@ public abstract class ScheduledChore implements Runnable {
}
public synchronized void cancel(boolean mayInterruptIfRunning) {
- if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
-
- choreServicer = null;
+ if (isScheduled()) {
+ choreService.cancelChore(this, mayInterruptIfRunning);
+ }
+ choreService = null;
}
public String getName() {
@@ -310,17 +277,14 @@ public abstract class ScheduledChore implements Runnable {
return initialChoreComplete;
}
- @InterfaceAudience.Private
- synchronized ChoreServicer getChoreServicer() {
- return choreServicer;
+ synchronized ChoreService getChoreService() {
+ return choreService;
}
- @InterfaceAudience.Private
synchronized long getTimeOfLastRun() {
return timeOfLastRun;
}
- @InterfaceAudience.Private
synchronized long getTimeOfThisRun() {
return timeOfThisRun;
}
@@ -329,10 +293,12 @@ public abstract class ScheduledChore implements Runnable {
* @return true when this Chore is scheduled with a ChoreService
*/
public synchronized boolean isScheduled() {
- return choreServicer != null && choreServicer.isChoreScheduled(this);
+ return choreService != null && choreService.isChoreScheduled(this);
}
@InterfaceAudience.Private
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
public synchronized void choreForTesting() {
chore();
}
@@ -354,7 +320,26 @@ public abstract class ScheduledChore implements Runnable {
/**
* Override to run cleanup tasks when the Chore encounters an error and must stop running
*/
- protected synchronized void cleanup() {
+ protected void cleanup() {
+ }
+
+ /**
+ * Call {@link #shutdown(boolean)} with {@code true}.
+ * @see ScheduledChore#shutdown(boolean)
+ */
+ public synchronized void shutdown() {
+ shutdown(true);
+ }
+
+ /**
+ * Completely shutdown the ScheduleChore, which means we will call cleanup and you should not
+ * schedule it again.
+ * <p/>
+ * This is another path to cleanup the chore, comparing to stop the stopper instance passed in.
+ */
+ public synchronized void shutdown(boolean mayInterruptIfRunning) {
+ cancel(mayInterruptIfRunning);
+ cleanup();
}
/**
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
index 69a171c..64a076a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
@@ -20,16 +20,18 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore;
-import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -38,261 +40,234 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Category(MediumTests.class)
+@Category({ MiscTests.class, MediumTests.class })
public class TestChoreService {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestChoreService.class);
+ HBaseClassTestRule.forClass(TestChoreService.class);
- public static final Logger log = LoggerFactory.getLogger(TestChoreService.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestChoreService.class);
+
+ private static final Configuration CONF = HBaseConfiguration.create();
@Rule
public TestName name = new TestName();
+ private int initialCorePoolSize = 3;
+
+ private ChoreService service;
+
+ @Before
+ public void setUp() {
+ service = new ChoreService(name.getMethodName(), initialCorePoolSize, false);
+ }
+
+ @After
+ public void tearDown() {
+ shutdownService(service);
+ }
+
/**
- * A few ScheduledChore samples that are useful for testing with ChoreService
+ * Straight forward stopper implementation that is used by default when one is not provided
*/
- public static class ScheduledChoreSamples {
- /**
- * Straight forward stopper implementation that is used by default when one is not provided
- */
- public static class SampleStopper implements Stoppable {
- private boolean stopped = false;
-
- @Override
- public void stop(String why) {
- stopped = true;
- }
+ private static class SampleStopper implements Stoppable {
+ private boolean stopped = false;
- @Override
- public boolean isStopped() {
- return stopped;
- }
+ @Override
+ public void stop(String why) {
+ stopped = true;
}
- /**
- * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
- * executions
- */
- public static class SlowChore extends ScheduledChore {
- public SlowChore(String name, int period) {
- this(name, new SampleStopper(), period);
- }
-
- public SlowChore(String name, Stoppable stopper, int period) {
- super(name, stopper, period);
- }
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+ }
- @Override
- protected boolean initialChore() {
- try {
- Thread.sleep(getPeriod() * 2);
- } catch (InterruptedException e) {
- log.warn("", e);
- }
- return true;
- }
+ /**
+ * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
+ * executions
+ */
+ private static class SlowChore extends ScheduledChore {
+ public SlowChore(String name, int period) {
+ this(name, new SampleStopper(), period);
+ }
- @Override
- protected void chore() {
- try {
- Thread.sleep(getPeriod() * 2);
- } catch (InterruptedException e) {
- log.warn("", e);
- }
- }
+ public SlowChore(String name, Stoppable stopper, int period) {
+ super(name, stopper, period);
}
- /**
- * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
- */
- public static class DoNothingChore extends ScheduledChore {
- public DoNothingChore(String name, int period) {
- super(name, new SampleStopper(), period);
- }
+ @Override
+ protected boolean initialChore() {
+ Threads.sleep(getPeriod() * 2);
+ return true;
+ }
- public DoNothingChore(String name, Stoppable stopper, int period) {
- super(name, stopper, period);
- }
+ @Override
+ protected void chore() {
+ Threads.sleep(getPeriod() * 2);
+ }
+ }
- @Override
- protected void chore() {
- // DO NOTHING
- }
+ /**
+ * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
+ */
+ private static class DoNothingChore extends ScheduledChore {
+ public DoNothingChore(String name, int period) {
+ super(name, new SampleStopper(), period);
}
- public static class SleepingChore extends ScheduledChore {
- private int sleepTime;
+ public DoNothingChore(String name, Stoppable stopper, int period) {
+ super(name, stopper, period);
+ }
- public SleepingChore(String name, int chorePeriod, int sleepTime) {
- this(name, new SampleStopper(), chorePeriod, sleepTime);
- }
+ @Override
+ protected void chore() {
+ // DO NOTHING
+ }
+ }
- public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) {
- super(name, stopper, period);
- this.sleepTime = sleepTime;
- }
+ private static class SleepingChore extends ScheduledChore {
+ private int sleepTime;
- @Override
- protected boolean initialChore() {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- log.warn("", e);
- }
- return true;
- }
+ public SleepingChore(String name, int chorePeriod, int sleepTime) {
+ this(name, new SampleStopper(), chorePeriod, sleepTime);
+ }
- @Override
- protected void chore() {
- try {
- Thread.sleep(sleepTime);
- } catch (Exception e) {
- log.warn("", e);
- }
- }
+ public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) {
+ super(name, stopper, period);
+ this.sleepTime = sleepTime;
}
- public static class CountingChore extends ScheduledChore {
- private int countOfChoreCalls;
- private boolean outputOnTicks = false;
+ @Override
+ protected boolean initialChore() {
+ Threads.sleep(sleepTime);
+ return true;
+ }
- public CountingChore(String name, int period) {
- this(name, new SampleStopper(), period);
- }
+ @Override
+ protected void chore() {
+ Threads.sleep(sleepTime);
+ }
+ }
- public CountingChore(String name, Stoppable stopper, int period) {
- this(name, stopper, period, false);
- }
+ private static class CountingChore extends ScheduledChore {
+ private int countOfChoreCalls;
+ private boolean outputOnTicks = false;
- public CountingChore(String name, Stoppable stopper, int period,
- final boolean outputOnTicks) {
- super(name, stopper, period);
- this.countOfChoreCalls = 0;
- this.outputOnTicks = outputOnTicks;
- }
+ public CountingChore(String name, int period) {
+ this(name, new SampleStopper(), period);
+ }
- @Override
- protected boolean initialChore() {
- countOfChoreCalls++;
- if (outputOnTicks) {
- outputTickCount();
- }
- return true;
- }
+ public CountingChore(String name, Stoppable stopper, int period) {
+ this(name, stopper, period, false);
+ }
- @Override
- protected void chore() {
- countOfChoreCalls++;
- if (outputOnTicks) {
- outputTickCount();
- }
- }
+ public CountingChore(String name, Stoppable stopper, int period, final boolean outputOnTicks) {
+ super(name, stopper, period);
+ this.countOfChoreCalls = 0;
+ this.outputOnTicks = outputOnTicks;
+ }
- private void outputTickCount() {
- log.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
+ @Override
+ protected boolean initialChore() {
+ countOfChoreCalls++;
+ if (outputOnTicks) {
+ outputTickCount();
}
+ return true;
+ }
- public int getCountOfChoreCalls() {
- return countOfChoreCalls;
+ @Override
+ protected void chore() {
+ countOfChoreCalls++;
+ if (outputOnTicks) {
+ outputTickCount();
}
+ }
- public boolean isOutputtingOnTicks() {
- return outputOnTicks;
- }
+ private void outputTickCount() {
+ LOG.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
+ }
- public void setOutputOnTicks(boolean o) {
- outputOnTicks = o;
- }
+ public int getCountOfChoreCalls() {
+ return countOfChoreCalls;
}
+ }
+
+ /**
+ * A Chore that will try to execute the initial chore a few times before succeeding. Once the
+ * initial chore is complete the chore cancels itself
+ */
+ public static class FailInitialChore extends ScheduledChore {
+ private int numberOfFailures;
+ private int failureThreshold;
/**
- * A Chore that will try to execute the initial chore a few times before succeeding. Once the
- * initial chore is complete the chore cancels itself
+ * @param failThreshold Number of times the Chore fails when trying to execute initialChore
+ * before succeeding.
*/
- public static class FailInitialChore extends ScheduledChore {
- private int numberOfFailures;
- private int failureThreshold;
-
- /**
- * @param failThreshold Number of times the Chore fails when trying to execute initialChore
- * before succeeding.
- */
- public FailInitialChore(String name, int period, int failThreshold) {
- this(name, new SampleStopper(), period, failThreshold);
- }
-
- public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) {
- super(name, stopper, period);
- numberOfFailures = 0;
- failureThreshold = failThreshold;
- }
+ public FailInitialChore(String name, int period, int failThreshold) {
+ this(name, new SampleStopper(), period, failThreshold);
+ }
- @Override
- protected boolean initialChore() {
- if (numberOfFailures < failureThreshold) {
- numberOfFailures++;
- return false;
- } else {
- return true;
- }
- }
+ public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) {
+ super(name, stopper, period);
+ numberOfFailures = 0;
+ failureThreshold = failThreshold;
+ }
- @Override
- protected void chore() {
- assertTrue(numberOfFailures == failureThreshold);
- cancel(false);
+ @Override
+ protected boolean initialChore() {
+ if (numberOfFailures < failureThreshold) {
+ numberOfFailures++;
+ return false;
+ } else {
+ return true;
}
+ }
+ @Override
+ protected void chore() {
+ assertTrue(numberOfFailures == failureThreshold);
+ cancel(false);
}
}
@Test
public void testInitialChorePrecedence() throws InterruptedException {
- ChoreService service = new ChoreService("testInitialChorePrecedence");
-
final int period = 100;
final int failureThreshold = 5;
-
- try {
- ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
- service.scheduleChore(chore);
-
- int loopCount = 0;
- boolean brokeOutOfLoop = false;
-
- while (!chore.isInitialChoreComplete() && chore.isScheduled()) {
- Thread.sleep(failureThreshold * period);
- loopCount++;
- if (loopCount > 3) {
- brokeOutOfLoop = true;
- break;
- }
+ ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
+ service.scheduleChore(chore);
+
+ int loopCount = 0;
+ boolean brokeOutOfLoop = false;
+
+ while (!chore.isInitialChoreComplete() && chore.isScheduled()) {
+ Thread.sleep(failureThreshold * period);
+ loopCount++;
+ if (loopCount > 3) {
+ brokeOutOfLoop = true;
+ break;
}
-
- assertFalse(brokeOutOfLoop);
- } finally {
- shutdownService(service);
}
+
+ assertFalse(brokeOutOfLoop);
}
@Test
public void testCancelChore() throws InterruptedException {
final int period = 100;
- ScheduledChore chore1 = new DoNothingChore("chore1", period);
- ChoreService service = new ChoreService("testCancelChore");
- try {
- service.scheduleChore(chore1);
- assertTrue(chore1.isScheduled());
+ ScheduledChore chore = new DoNothingChore("chore", period);
+ service.scheduleChore(chore);
+ assertTrue(chore.isScheduled());
- chore1.cancel(true);
- assertFalse(chore1.isScheduled());
- assertTrue(service.getNumberOfScheduledChores() == 0);
- } finally {
- shutdownService(service);
- }
+ chore.cancel(true);
+ assertFalse(chore.isScheduled());
+ assertTrue(service.getNumberOfScheduledChores() == 0);
}
@Test
@@ -304,12 +279,12 @@ public class TestChoreService {
final TimeUnit UNIT = TimeUnit.NANOSECONDS;
ScheduledChore chore1 =
- new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) {
- @Override
- protected void chore() {
- // DO NOTHING
- }
- };
+ new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) {
+ @Override
+ protected void chore() {
+ // DO NOTHING
+ }
+ };
assertEquals("Name construction failed", NAME, chore1.getName());
assertEquals("Period construction failed", PERIOD, chore1.getPeriod());
@@ -317,12 +292,12 @@ public class TestChoreService {
assertEquals("TimeUnit construction failed", UNIT, chore1.getTimeUnit());
ScheduledChore invalidDelayChore =
- new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) {
- @Override
- protected void chore() {
- // DO NOTHING
- }
- };
+ new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) {
+ @Override
+ protected void chore() {
+ // DO NOTHING
+ }
+ };
assertEquals("Initial Delay should be set to 0 when invalid", 0,
invalidDelayChore.getInitialDelay());
@@ -334,7 +309,7 @@ public class TestChoreService {
final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
ChoreService customInit =
- new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
+ new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
try {
assertEquals(corePoolSize, customInit.getCorePoolSize());
} finally {
@@ -360,258 +335,218 @@ public class TestChoreService {
public void testFrequencyOfChores() throws InterruptedException {
final int period = 100;
// Small delta that acts as time buffer (allowing chores to complete if running slowly)
- final int delta = period/5;
- ChoreService service = new ChoreService("testFrequencyOfChores");
+ final int delta = period / 5;
CountingChore chore = new CountingChore("countingChore", period);
- try {
- service.scheduleChore(chore);
+ service.scheduleChore(chore);
- Thread.sleep(10 * period + delta);
- assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
+ Thread.sleep(10 * period + delta);
+ assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
- Thread.sleep(10 * period + delta);
- assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls());
- } finally {
- shutdownService(service);
- }
+ Thread.sleep(10 * period + delta);
+ assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls());
}
- public void shutdownService(ChoreService service) throws InterruptedException {
+ public void shutdownService(ChoreService service) {
service.shutdown();
- while (!service.isTerminated()) {
- Thread.sleep(100);
- }
+ Waiter.waitFor(CONF, 1000, () -> service.isTerminated());
}
@Test
public void testForceTrigger() throws InterruptedException {
final int period = 100;
- final int delta = period/10;
- ChoreService service = new ChoreService("testForceTrigger");
+ final int delta = period / 10;
final CountingChore chore = new CountingChore("countingChore", period);
- try {
- service.scheduleChore(chore);
- Thread.sleep(10 * period + delta);
-
- assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
-
- // Force five runs of the chore to occur, sleeping between triggers to ensure the
- // chore has time to run
- chore.triggerNow();
- Thread.sleep(delta);
- chore.triggerNow();
- Thread.sleep(delta);
- chore.triggerNow();
- Thread.sleep(delta);
- chore.triggerNow();
- Thread.sleep(delta);
- chore.triggerNow();
- Thread.sleep(delta);
-
- assertEquals("Trigger was called 5 times after 10 periods.", 16,
- chore.getCountOfChoreCalls());
-
- Thread.sleep(10 * period + delta);
-
- // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing.
- assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(),
- chore.getCountOfChoreCalls() > 16);
- } finally {
- shutdownService(service);
- }
+ service.scheduleChore(chore);
+ Thread.sleep(10 * period + delta);
+
+ assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
+
+ // Force five runs of the chore to occur, sleeping between triggers to ensure the
+ // chore has time to run
+ chore.triggerNow();
+ Thread.sleep(delta);
+ chore.triggerNow();
+ Thread.sleep(delta);
+ chore.triggerNow();
+ Thread.sleep(delta);
+ chore.triggerNow();
+ Thread.sleep(delta);
+ chore.triggerNow();
+ Thread.sleep(delta);
+
+ assertEquals("Trigger was called 5 times after 10 periods.", 16, chore.getCountOfChoreCalls());
+
+ Thread.sleep(10 * period + delta);
+
+ // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing.
+ assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(),
+ chore.getCountOfChoreCalls() > 16);
}
@Test
public void testCorePoolIncrease() throws InterruptedException {
- final int initialCorePoolSize = 3;
- ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false);
+ assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize,
+ service.getCorePoolSize());
- try {
- assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize,
- service.getCorePoolSize());
-
- final int slowChorePeriod = 100;
- SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod);
- SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod);
- SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod);
+ final int slowChorePeriod = 100;
+ SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod);
+ SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod);
+ SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod);
- service.scheduleChore(slowChore1);
- service.scheduleChore(slowChore2);
- service.scheduleChore(slowChore3);
+ service.scheduleChore(slowChore1);
+ service.scheduleChore(slowChore2);
+ service.scheduleChore(slowChore3);
- Thread.sleep(slowChorePeriod * 10);
- assertEquals("Should not create more pools than scheduled chores", 3,
- service.getCorePoolSize());
+ Thread.sleep(slowChorePeriod * 10);
+ assertEquals("Should not create more pools than scheduled chores", 3,
+ service.getCorePoolSize());
- SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod);
- service.scheduleChore(slowChore4);
+ SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod);
+ service.scheduleChore(slowChore4);
- Thread.sleep(slowChorePeriod * 10);
- assertEquals("Chores are missing their start time. Should expand core pool size", 4,
- service.getCorePoolSize());
+ Thread.sleep(slowChorePeriod * 10);
+ assertEquals("Chores are missing their start time. Should expand core pool size", 4,
+ service.getCorePoolSize());
- SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod);
- service.scheduleChore(slowChore5);
+ SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod);
+ service.scheduleChore(slowChore5);
- Thread.sleep(slowChorePeriod * 10);
- assertEquals("Chores are missing their start time. Should expand core pool size", 5,
- service.getCorePoolSize());
- } finally {
- shutdownService(service);
- }
+ Thread.sleep(slowChorePeriod * 10);
+ assertEquals("Chores are missing their start time. Should expand core pool size", 5,
+ service.getCorePoolSize());
}
@Test
public void testCorePoolDecrease() throws InterruptedException {
- final int initialCorePoolSize = 3;
- ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false);
final int chorePeriod = 100;
- try {
- // Slow chores always miss their start time and thus the core pool size should be at least as
- // large as the number of running slow chores
- SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
- SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod);
- SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod);
-
- service.scheduleChore(slowChore1);
- service.scheduleChore(slowChore2);
- service.scheduleChore(slowChore3);
-
- Thread.sleep(chorePeriod * 10);
- assertEquals("Should not create more pools than scheduled chores",
- service.getNumberOfScheduledChores(), service.getCorePoolSize());
-
- SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod);
- service.scheduleChore(slowChore4);
- Thread.sleep(chorePeriod * 10);
- assertEquals("Chores are missing their start time. Should expand core pool size",
- service.getNumberOfScheduledChores(), service.getCorePoolSize());
-
- SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod);
- service.scheduleChore(slowChore5);
- Thread.sleep(chorePeriod * 10);
- assertEquals("Chores are missing their start time. Should expand core pool size",
- service.getNumberOfScheduledChores(), service.getCorePoolSize());
- assertEquals(5, service.getNumberOfChoresMissingStartTime());
-
- // Now we begin to cancel the chores that caused an increase in the core thread pool of the
- // ChoreService. These cancellations should cause a decrease in the core thread pool.
- slowChore5.cancel();
- Thread.sleep(chorePeriod * 10);
- assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
- service.getCorePoolSize());
- assertEquals(4, service.getNumberOfChoresMissingStartTime());
-
- slowChore4.cancel();
- Thread.sleep(chorePeriod * 10);
- assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
- service.getCorePoolSize());
- assertEquals(3, service.getNumberOfChoresMissingStartTime());
-
- slowChore3.cancel();
- Thread.sleep(chorePeriod * 10);
- assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
- service.getCorePoolSize());
- assertEquals(2, service.getNumberOfChoresMissingStartTime());
-
- slowChore2.cancel();
- Thread.sleep(chorePeriod * 10);
- assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
- service.getCorePoolSize());
- assertEquals(1, service.getNumberOfChoresMissingStartTime());
-
- slowChore1.cancel();
- Thread.sleep(chorePeriod * 10);
- assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
- service.getCorePoolSize());
- assertEquals(0, service.getNumberOfChoresMissingStartTime());
- } finally {
- shutdownService(service);
- }
+ // Slow chores always miss their start time and thus the core pool size should be at least as
+ // large as the number of running slow chores
+ SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
+ SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod);
+ SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod);
+
+ service.scheduleChore(slowChore1);
+ service.scheduleChore(slowChore2);
+ service.scheduleChore(slowChore3);
+
+ Thread.sleep(chorePeriod * 10);
+ assertEquals("Should not create more pools than scheduled chores",
+ service.getNumberOfScheduledChores(), service.getCorePoolSize());
+
+ SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod);
+ service.scheduleChore(slowChore4);
+ Thread.sleep(chorePeriod * 10);
+ assertEquals("Chores are missing their start time. Should expand core pool size",
+ service.getNumberOfScheduledChores(), service.getCorePoolSize());
+
+ SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod);
+ service.scheduleChore(slowChore5);
+ Thread.sleep(chorePeriod * 10);
+ assertEquals("Chores are missing their start time. Should expand core pool size",
+ service.getNumberOfScheduledChores(), service.getCorePoolSize());
+ assertEquals(5, service.getNumberOfChoresMissingStartTime());
+
+ // Now we begin to cancel the chores that caused an increase in the core thread pool of the
+ // ChoreService. These cancellations should cause a decrease in the core thread pool.
+ slowChore5.cancel();
+ Thread.sleep(chorePeriod * 10);
+ assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+ service.getCorePoolSize());
+ assertEquals(4, service.getNumberOfChoresMissingStartTime());
+
+ slowChore4.cancel();
+ Thread.sleep(chorePeriod * 10);
+ assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+ service.getCorePoolSize());
+ assertEquals(3, service.getNumberOfChoresMissingStartTime());
+
+ slowChore3.cancel();
+ Thread.sleep(chorePeriod * 10);
+ assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+ service.getCorePoolSize());
+ assertEquals(2, service.getNumberOfChoresMissingStartTime());
+
+ slowChore2.cancel();
+ Thread.sleep(chorePeriod * 10);
+ assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+ service.getCorePoolSize());
+ assertEquals(1, service.getNumberOfChoresMissingStartTime());
+
+ slowChore1.cancel();
+ Thread.sleep(chorePeriod * 10);
+ assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+ service.getCorePoolSize());
+ assertEquals(0, service.getNumberOfChoresMissingStartTime());
}
@Test
public void testNumberOfRunningChores() throws InterruptedException {
- ChoreService service = new ChoreService("testNumberOfRunningChores");
-
final int period = 100;
final int sleepTime = 5;
-
- try {
- DoNothingChore dn1 = new DoNothingChore("dn1", period);
- DoNothingChore dn2 = new DoNothingChore("dn2", period);
- DoNothingChore dn3 = new DoNothingChore("dn3", period);
- DoNothingChore dn4 = new DoNothingChore("dn4", period);
- DoNothingChore dn5 = new DoNothingChore("dn5", period);
-
- service.scheduleChore(dn1);
- service.scheduleChore(dn2);
- service.scheduleChore(dn3);
- service.scheduleChore(dn4);
- service.scheduleChore(dn5);
-
- Thread.sleep(sleepTime);
- assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores());
-
- dn1.cancel();
- Thread.sleep(sleepTime);
- assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores());
-
- dn2.cancel();
- dn3.cancel();
- dn4.cancel();
- Thread.sleep(sleepTime);
- assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores());
-
- dn5.cancel();
- Thread.sleep(sleepTime);
- assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
- } finally {
- shutdownService(service);
- }
+ DoNothingChore dn1 = new DoNothingChore("dn1", period);
+ DoNothingChore dn2 = new DoNothingChore("dn2", period);
+ DoNothingChore dn3 = new DoNothingChore("dn3", period);
+ DoNothingChore dn4 = new DoNothingChore("dn4", period);
+ DoNothingChore dn5 = new DoNothingChore("dn5", period);
+
+ service.scheduleChore(dn1);
+ service.scheduleChore(dn2);
+ service.scheduleChore(dn3);
+ service.scheduleChore(dn4);
+ service.scheduleChore(dn5);
+
+ Thread.sleep(sleepTime);
+ assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores());
+
+ dn1.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores());
+
+ dn2.cancel();
+ dn3.cancel();
+ dn4.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores());
+
+ dn5.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
}
@Test
public void testNumberOfChoresMissingStartTime() throws InterruptedException {
- ChoreService service = new ChoreService("testNumberOfChoresMissingStartTime");
-
final int period = 100;
final int sleepTime = 20 * period;
-
- try {
- // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
- // ALWAYS miss their start time since their execution takes longer than their period
- SlowChore sc1 = new SlowChore("sc1", period);
- SlowChore sc2 = new SlowChore("sc2", period);
- SlowChore sc3 = new SlowChore("sc3", period);
- SlowChore sc4 = new SlowChore("sc4", period);
- SlowChore sc5 = new SlowChore("sc5", period);
-
- service.scheduleChore(sc1);
- service.scheduleChore(sc2);
- service.scheduleChore(sc3);
- service.scheduleChore(sc4);
- service.scheduleChore(sc5);
-
- Thread.sleep(sleepTime);
- assertEquals(5, service.getNumberOfChoresMissingStartTime());
-
- sc1.cancel();
- Thread.sleep(sleepTime);
- assertEquals(4, service.getNumberOfChoresMissingStartTime());
-
- sc2.cancel();
- sc3.cancel();
- sc4.cancel();
- Thread.sleep(sleepTime);
- assertEquals(1, service.getNumberOfChoresMissingStartTime());
-
- sc5.cancel();
- Thread.sleep(sleepTime);
- assertEquals(0, service.getNumberOfChoresMissingStartTime());
- } finally {
- shutdownService(service);
- }
+ // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
+ // ALWAYS miss their start time since their execution takes longer than their period
+ SlowChore sc1 = new SlowChore("sc1", period);
+ SlowChore sc2 = new SlowChore("sc2", period);
+ SlowChore sc3 = new SlowChore("sc3", period);
+ SlowChore sc4 = new SlowChore("sc4", period);
+ SlowChore sc5 = new SlowChore("sc5", period);
+
+ service.scheduleChore(sc1);
+ service.scheduleChore(sc2);
+ service.scheduleChore(sc3);
+ service.scheduleChore(sc4);
+ service.scheduleChore(sc5);
+
+ Thread.sleep(sleepTime);
+ assertEquals(5, service.getNumberOfChoresMissingStartTime());
+
+ sc1.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals(4, service.getNumberOfChoresMissingStartTime());
+
+ sc2.cancel();
+ sc3.cancel();
+ sc4.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals(1, service.getNumberOfChoresMissingStartTime());
+
+ sc5.cancel();
+ Thread.sleep(sleepTime);
+ assertEquals(0, service.getNumberOfChoresMissingStartTime());
}
/**
@@ -621,163 +556,145 @@ public class TestChoreService {
*/
@Test
public void testMaximumChoreServiceThreads() throws InterruptedException {
- ChoreService service = new ChoreService("testMaximumChoreServiceThreads");
final int period = 100;
final int sleepTime = 5 * period;
-
- try {
- // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
- // ALWAYS miss their start time since their execution takes longer than their period.
- // Chores that miss their start time will trigger the onChoreMissedStartTime callback
- // in the ChoreService. This callback will try to increase the number of core pool
- // threads.
- SlowChore sc1 = new SlowChore("sc1", period);
- SlowChore sc2 = new SlowChore("sc2", period);
- SlowChore sc3 = new SlowChore("sc3", period);
- SlowChore sc4 = new SlowChore("sc4", period);
- SlowChore sc5 = new SlowChore("sc5", period);
-
- service.scheduleChore(sc1);
- service.scheduleChore(sc2);
- service.scheduleChore(sc3);
- service.scheduleChore(sc4);
- service.scheduleChore(sc5);
-
- Thread.sleep(sleepTime);
- assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
-
- SlowChore sc6 = new SlowChore("sc6", period);
- SlowChore sc7 = new SlowChore("sc7", period);
- SlowChore sc8 = new SlowChore("sc8", period);
- SlowChore sc9 = new SlowChore("sc9", period);
- SlowChore sc10 = new SlowChore("sc10", period);
-
- service.scheduleChore(sc6);
- service.scheduleChore(sc7);
- service.scheduleChore(sc8);
- service.scheduleChore(sc9);
- service.scheduleChore(sc10);
-
- Thread.sleep(sleepTime);
- assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
- } finally {
- shutdownService(service);
- }
+ // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
+ // ALWAYS miss their start time since their execution takes longer than their period.
+ // Chores that miss their start time will trigger the onChoreMissedStartTime callback
+ // in the ChoreService. This callback will try to increase the number of core pool
+ // threads.
+ SlowChore sc1 = new SlowChore("sc1", period);
+ SlowChore sc2 = new SlowChore("sc2", period);
+ SlowChore sc3 = new SlowChore("sc3", period);
+ SlowChore sc4 = new SlowChore("sc4", period);
+ SlowChore sc5 = new SlowChore("sc5", period);
+
+ service.scheduleChore(sc1);
+ service.scheduleChore(sc2);
+ service.scheduleChore(sc3);
+ service.scheduleChore(sc4);
+ service.scheduleChore(sc5);
+
+ Thread.sleep(sleepTime);
+ assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
+
+ SlowChore sc6 = new SlowChore("sc6", period);
+ SlowChore sc7 = new SlowChore("sc7", period);
+ SlowChore sc8 = new SlowChore("sc8", period);
+ SlowChore sc9 = new SlowChore("sc9", period);
+ SlowChore sc10 = new SlowChore("sc10", period);
+
+ service.scheduleChore(sc6);
+ service.scheduleChore(sc7);
+ service.scheduleChore(sc8);
+ service.scheduleChore(sc9);
+ service.scheduleChore(sc10);
+
+ Thread.sleep(sleepTime);
+ assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
}
@Test
public void testChangingChoreServices() throws InterruptedException {
final int period = 100;
final int sleepTime = 10;
- ChoreService service1 = new ChoreService("testChangingChoreServices_1");
- ChoreService service2 = new ChoreService("testChangingChoreServices_2");
+ ChoreService anotherService = new ChoreService(name.getMethodName() + "_2");
ScheduledChore chore = new DoNothingChore("sample", period);
try {
assertFalse(chore.isScheduled());
- assertFalse(service1.isChoreScheduled(chore));
- assertFalse(service2.isChoreScheduled(chore));
- assertTrue(chore.getChoreServicer() == null);
+ assertFalse(service.isChoreScheduled(chore));
+ assertFalse(anotherService.isChoreScheduled(chore));
+ assertTrue(chore.getChoreService() == null);
- service1.scheduleChore(chore);
+ service.scheduleChore(chore);
Thread.sleep(sleepTime);
assertTrue(chore.isScheduled());
- assertTrue(service1.isChoreScheduled(chore));
- assertFalse(service2.isChoreScheduled(chore));
- assertFalse(chore.getChoreServicer() == null);
+ assertTrue(service.isChoreScheduled(chore));
+ assertFalse(anotherService.isChoreScheduled(chore));
+ assertFalse(chore.getChoreService() == null);
- service2.scheduleChore(chore);
+ anotherService.scheduleChore(chore);
Thread.sleep(sleepTime);
assertTrue(chore.isScheduled());
- assertFalse(service1.isChoreScheduled(chore));
- assertTrue(service2.isChoreScheduled(chore));
- assertFalse(chore.getChoreServicer() == null);
+ assertFalse(service.isChoreScheduled(chore));
+ assertTrue(anotherService.isChoreScheduled(chore));
+ assertFalse(chore.getChoreService() == null);
chore.cancel();
assertFalse(chore.isScheduled());
- assertFalse(service1.isChoreScheduled(chore));
- assertFalse(service2.isChoreScheduled(chore));
- assertTrue(chore.getChoreServicer() == null);
+ assertFalse(service.isChoreScheduled(chore));
+ assertFalse(anotherService.isChoreScheduled(chore));
+ assertTrue(chore.getChoreService() == null);
} finally {
- shutdownService(service1);
- shutdownService(service2);
+ shutdownService(anotherService);
}
}
@Test
public void testStopperForScheduledChores() throws InterruptedException {
- ChoreService service = new ChoreService("testStopperForScheduledChores");
Stoppable stopperForGroup1 = new SampleStopper();
Stoppable stopperForGroup2 = new SampleStopper();
final int period = 100;
- final int delta = period/10;
-
- try {
- ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
- ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
- ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
-
- ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period);
- ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period);
- ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period);
-
- service.scheduleChore(chore1_group1);
- service.scheduleChore(chore2_group1);
- service.scheduleChore(chore3_group1);
- service.scheduleChore(chore1_group2);
- service.scheduleChore(chore2_group2);
- service.scheduleChore(chore3_group2);
-
- Thread.sleep(delta);
- Thread.sleep(10 * period);
- assertTrue(chore1_group1.isScheduled());
- assertTrue(chore2_group1.isScheduled());
- assertTrue(chore3_group1.isScheduled());
- assertTrue(chore1_group2.isScheduled());
- assertTrue(chore2_group2.isScheduled());
- assertTrue(chore3_group2.isScheduled());
-
- stopperForGroup1.stop("test stopping group 1");
- Thread.sleep(period);
- assertFalse(chore1_group1.isScheduled());
- assertFalse(chore2_group1.isScheduled());
- assertFalse(chore3_group1.isScheduled());
- assertTrue(chore1_group2.isScheduled());
- assertTrue(chore2_group2.isScheduled());
- assertTrue(chore3_group2.isScheduled());
-
- stopperForGroup2.stop("test stopping group 2");
- Thread.sleep(period);
- assertFalse(chore1_group1.isScheduled());
- assertFalse(chore2_group1.isScheduled());
- assertFalse(chore3_group1.isScheduled());
- assertFalse(chore1_group2.isScheduled());
- assertFalse(chore2_group2.isScheduled());
- assertFalse(chore3_group2.isScheduled());
- } finally {
- shutdownService(service);
- }
+ final int delta = period / 10;
+ ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
+ ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
+ ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
+
+ ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period);
+ ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period);
+ ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period);
+
+ service.scheduleChore(chore1_group1);
+ service.scheduleChore(chore2_group1);
+ service.scheduleChore(chore3_group1);
+ service.scheduleChore(chore1_group2);
+ service.scheduleChore(chore2_group2);
+ service.scheduleChore(chore3_group2);
+
+ Thread.sleep(delta);
+ Thread.sleep(10 * period);
+ assertTrue(chore1_group1.isScheduled());
+ assertTrue(chore2_group1.isScheduled());
+ assertTrue(chore3_group1.isScheduled());
+ assertTrue(chore1_group2.isScheduled());
+ assertTrue(chore2_group2.isScheduled());
+ assertTrue(chore3_group2.isScheduled());
+
+ stopperForGroup1.stop("test stopping group 1");
+ Thread.sleep(period);
+ assertFalse(chore1_group1.isScheduled());
+ assertFalse(chore2_group1.isScheduled());
+ assertFalse(chore3_group1.isScheduled());
+ assertTrue(chore1_group2.isScheduled());
+ assertTrue(chore2_group2.isScheduled());
+ assertTrue(chore3_group2.isScheduled());
+
+ stopperForGroup2.stop("test stopping group 2");
+ Thread.sleep(period);
+ assertFalse(chore1_group1.isScheduled());
+ assertFalse(chore2_group1.isScheduled());
+ assertFalse(chore3_group1.isScheduled());
+ assertFalse(chore1_group2.isScheduled());
+ assertFalse(chore2_group2.isScheduled());
+ assertFalse(chore3_group2.isScheduled());
}
@Test
public void testShutdownCancelsScheduledChores() throws InterruptedException {
final int period = 100;
- ChoreService service = new ChoreService("testShutdownCancelsScheduledChores");
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
+ assertTrue(service.scheduleChore(successChore1));
+ assertTrue(successChore1.isScheduled());
+ assertTrue(service.scheduleChore(successChore2));
+ assertTrue(successChore2.isScheduled());
+ assertTrue(service.scheduleChore(successChore3));
+ assertTrue(successChore3.isScheduled());
- try {
- assertTrue(service.scheduleChore(successChore1));
- assertTrue(successChore1.isScheduled());
- assertTrue(service.scheduleChore(successChore2));
- assertTrue(successChore2.isScheduled());
- assertTrue(service.scheduleChore(successChore3));
- assertTrue(successChore3.isScheduled());
- } finally {
- shutdownService(service);
- }
+ shutdownService(service);
assertFalse(successChore1.isScheduled());
assertFalse(successChore2.isScheduled());
@@ -788,34 +705,28 @@ public class TestChoreService {
public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
final int period = 100;
final int sleep = 5 * period;
- ChoreService service = new ChoreService("testShutdownWorksWhileChoresAreExecuting");
ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
- try {
- assertTrue(service.scheduleChore(slowChore1));
- assertTrue(service.scheduleChore(slowChore2));
- assertTrue(service.scheduleChore(slowChore3));
+ assertTrue(service.scheduleChore(slowChore1));
+ assertTrue(service.scheduleChore(slowChore2));
+ assertTrue(service.scheduleChore(slowChore3));
- Thread.sleep(sleep / 2);
- shutdownService(service);
+ Thread.sleep(sleep / 2);
+ shutdownService(service);
- assertFalse(slowChore1.isScheduled());
- assertFalse(slowChore2.isScheduled());
- assertFalse(slowChore3.isScheduled());
- assertTrue(service.isShutdown());
+ assertFalse(slowChore1.isScheduled());
+ assertFalse(slowChore2.isScheduled());
+ assertFalse(slowChore3.isScheduled());
+ assertTrue(service.isShutdown());
- Thread.sleep(5);
- assertTrue(service.isTerminated());
- } finally {
- shutdownService(service);
- }
+ Thread.sleep(5);
+ assertTrue(service.isTerminated());
}
@Test
public void testShutdownRejectsNewSchedules() throws InterruptedException {
final int period = 100;
- ChoreService service = new ChoreService("testShutdownRejectsNewSchedules");
ScheduledChore successChore1 = new DoNothingChore("sc1", period);
ScheduledChore successChore2 = new DoNothingChore("sc2", period);
ScheduledChore successChore3 = new DoNothingChore("sc3", period);
@@ -823,16 +734,14 @@ public class TestChoreService {
ScheduledChore failChore2 = new DoNothingChore("fc2", period);
ScheduledChore failChore3 = new DoNothingChore("fc3", period);
- try {
- assertTrue(service.scheduleChore(successChore1));
- assertTrue(successChore1.isScheduled());
- assertTrue(service.scheduleChore(successChore2));
- assertTrue(successChore2.isScheduled());
- assertTrue(service.scheduleChore(successChore3));
- assertTrue(successChore3.isScheduled());
- } finally {
- shutdownService(service);
- }
+ assertTrue(service.scheduleChore(successChore1));
+ assertTrue(successChore1.isScheduled());
+ assertTrue(service.scheduleChore(successChore2));
+ assertTrue(successChore2.isScheduled());
+ assertTrue(service.scheduleChore(successChore3));
+ assertTrue(successChore3.isScheduled());
+
+ shutdownService(service);
assertFalse(service.scheduleChore(failChore1));
assertFalse(failChore1.isScheduled());
@@ -845,17 +754,38 @@ public class TestChoreService {
/**
* for HBASE-25014
*/
- @Test(timeout = 10000)
+ @Test
public void testInitialDelay() {
- ChoreService service = new ChoreService(name.getMethodName());
SampleStopper stopper = new SampleStopper();
service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) {
- @Override protected void chore() {
+ @Override
+ protected void chore() {
stopper.stop("test");
}
});
- while (!stopper.isStopped()) {
- Threads.sleep(1000);
- }
+ Waiter.waitFor(CONF, 5000, () -> stopper.isStopped());
+ }
+
+ @Test
+ public void testCleanupWithStopper() {
+ SampleStopper stopper = new SampleStopper();
+ DoNothingChore chore = spy(new DoNothingChore("chore", stopper, 10));
+ service.scheduleChore(chore);
+ assertTrue(chore.isScheduled());
+ verify(chore, never()).cleanup();
+ stopper.stop("test");
+ Waiter.waitFor(CONF, 200, () -> !chore.isScheduled());
+ verify(chore, atLeastOnce()).cleanup();
+ }
+
+ @Test
+ public void testCleanupWithShutdown() {
+ DoNothingChore chore = spy(new DoNothingChore("chore", 10));
+ service.scheduleChore(chore);
+ assertTrue(chore.isScheduled());
+ verify(chore, never()).cleanup();
+ chore.shutdown(true);
+ Waiter.waitFor(CONF, 200, () -> !chore.isScheduled());
+ verify(chore, atLeastOnce()).cleanup();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cbe001e..94f3bf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -55,7 +55,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -1500,11 +1499,9 @@ public class HMaster extends HRegionServer implements MasterServices {
try {
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
if (on) {
- if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
- getChoreService().scheduleChore(this.snapshotCleanerChore);
- }
+ getChoreService().scheduleChore(this.snapshotCleanerChore);
} else {
- getChoreService().cancelChore(this.snapshotCleanerChore);
+ this.snapshotCleanerChore.cancel();
}
} catch (KeeperException e) {
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
@@ -1528,24 +1525,23 @@ public class HMaster extends HRegionServer implements MasterServices {
}
private void stopChores() {
- ChoreService choreService = getChoreService();
- if (choreService != null) {
- choreService.cancelChore(this.mobFileCleanerChore);
- choreService.cancelChore(this.mobFileCompactionChore);
- choreService.cancelChore(this.balancerChore);
+ if (getChoreService() != null) {
+ shutdownChore(mobFileCleanerChore);
+ shutdownChore(mobFileCompactionChore);
+ shutdownChore(balancerChore);
if (regionNormalizerManager != null) {
- choreService.cancelChore(regionNormalizerManager.getRegionNormalizerChore());
- }
- choreService.cancelChore(this.clusterStatusChore);
- choreService.cancelChore(this.catalogJanitorChore);
- choreService.cancelChore(this.clusterStatusPublisherChore);
- choreService.cancelChore(this.snapshotQuotaChore);
- choreService.cancelChore(this.logCleaner);
- choreService.cancelChore(this.hfileCleaner);
- choreService.cancelChore(this.replicationBarrierCleaner);
- choreService.cancelChore(this.snapshotCleanerChore);
- choreService.cancelChore(this.hbckChore);
- choreService.cancelChore(this.regionsRecoveryChore);
+ shutdownChore(regionNormalizerManager.getRegionNormalizerChore());
+ }
+ shutdownChore(clusterStatusChore);
+ shutdownChore(catalogJanitorChore);
+ shutdownChore(clusterStatusPublisherChore);
+ shutdownChore(snapshotQuotaChore);
+ shutdownChore(logCleaner);
+ shutdownChore(hfileCleaner);
+ shutdownChore(replicationBarrierCleaner);
+ shutdownChore(snapshotCleanerChore);
+ shutdownChore(hbckChore);
+ shutdownChore(regionsRecoveryChore);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
index a756715..5597cca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HConstants;
@@ -70,7 +69,6 @@ public class RegionsRecoveryChore extends ScheduledChore {
*/
RegionsRecoveryChore(final Stoppable stopper, final Configuration configuration,
final HMaster hMaster) {
-
super(REGIONS_RECOVERY_CHORE_NAME, stopper, configuration.getInt(
HConstants.REGIONS_RECOVERY_INTERVAL, HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL));
this.hMaster = hMaster;
@@ -125,7 +123,6 @@ public class RegionsRecoveryChore extends ScheduledChore {
private Map<TableName, List<byte[]>> getTableToRegionsByRefCount(
final Map<ServerName, ServerMetrics> serverMetricsMap) {
-
final Map<TableName, List<byte[]>> tableToReopenRegionsMap = new HashMap<>();
for (ServerMetrics serverMetrics : serverMetricsMap.values()) {
Map<byte[], RegionMetrics> regionMetricsMap = serverMetrics.getRegionMetrics();
@@ -146,13 +143,11 @@ public class RegionsRecoveryChore extends ScheduledChore {
}
}
return tableToReopenRegionsMap;
-
}
private void prepareTableToReopenRegionsMap(
final Map<TableName, List<byte[]>> tableToReopenRegionsMap,
final byte[] regionName, final int regionStoreRefCount) {
-
final RegionInfo regionInfo = hMaster.getAssignmentManager().getRegionInfo(regionName);
final TableName tableName = regionInfo.getTable();
if (TableName.isMetaTableName(tableName)) {
@@ -165,21 +160,4 @@ public class RegionsRecoveryChore extends ScheduledChore {
tableToReopenRegionsMap
.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
}
-
- // hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
- // is scheduled at a time - RegionsRecoveryConfigManager
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- return o != null && getClass() == o.getClass();
- }
-
- @Override
- public int hashCode() {
- return 31;
- }
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java
index b1bfdc0..78777a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryConfigManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master;
+import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HConstants;
@@ -27,8 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Config manager for RegionsRecovery Chore - Dynamically reload config and update chore
- * accordingly
+ * Config manager for RegionsRecovery Chore - Dynamically reload config and update chore accordingly
*/
@InterfaceAudience.Private
public class RegionsRecoveryConfigManager implements ConfigurationObserver {
@@ -36,6 +36,7 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RegionsRecoveryConfigManager.class);
private final HMaster hMaster;
+ private RegionsRecoveryChore chore;
private int prevMaxStoreFileRefCount;
private int prevRegionsRecoveryInterval;
@@ -51,34 +52,35 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
final int newMaxStoreFileRefCount = getMaxStoreFileRefCount(conf);
final int newRegionsRecoveryInterval = getRegionsRecoveryChoreInterval(conf);
- if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount
- && prevRegionsRecoveryInterval == newRegionsRecoveryInterval) {
+ if (prevMaxStoreFileRefCount == newMaxStoreFileRefCount &&
+ prevRegionsRecoveryInterval == newRegionsRecoveryInterval) {
// no need to re-schedule the chore with updated config
// as there is no change in desired configs
return;
}
- LOG.info("Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," +
+ LOG.info(
+ "Config Reload for RegionsRecovery Chore. prevMaxStoreFileRefCount: {}," +
" newMaxStoreFileRefCount: {}, prevRegionsRecoveryInterval: {}, " +
- "newRegionsRecoveryInterval: {}", prevMaxStoreFileRefCount, newMaxStoreFileRefCount,
- prevRegionsRecoveryInterval, newRegionsRecoveryInterval);
+ "newRegionsRecoveryInterval: {}",
+ prevMaxStoreFileRefCount, newMaxStoreFileRefCount, prevRegionsRecoveryInterval,
+ newRegionsRecoveryInterval);
- RegionsRecoveryChore regionsRecoveryChore = new RegionsRecoveryChore(this.hMaster,
- conf, this.hMaster);
+ RegionsRecoveryChore regionsRecoveryChore =
+ new RegionsRecoveryChore(this.hMaster, conf, this.hMaster);
ChoreService choreService = this.hMaster.getChoreService();
// Regions Reopen based on very high storeFileRefCount is considered enabled
// only if hbase.regions.recovery.store.file.ref.count has value > 0
-
synchronized (this) {
+ if (chore != null) {
+ chore.shutdown();
+ chore = null;
+ }
if (newMaxStoreFileRefCount > 0) {
- // reschedule the chore
- // provide mayInterruptIfRunning - false to take care of completion
- // of in progress task if any
- choreService.cancelChore(regionsRecoveryChore, false);
+ // schedule the new chore
choreService.scheduleChore(regionsRecoveryChore);
- } else {
- choreService.cancelChore(regionsRecoveryChore, false);
+ chore = regionsRecoveryChore;
}
this.prevMaxStoreFileRefCount = newMaxStoreFileRefCount;
this.prevRegionsRecoveryInterval = newRegionsRecoveryInterval;
@@ -86,15 +88,18 @@ public class RegionsRecoveryConfigManager implements ConfigurationObserver {
}
private int getMaxStoreFileRefCount(Configuration configuration) {
- return configuration.getInt(
- HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
+ return configuration.getInt(HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
}
private int getRegionsRecoveryChoreInterval(Configuration configuration) {
- return configuration.getInt(
- HConstants.REGIONS_RECOVERY_INTERVAL,
+ return configuration.getInt(HConstants.REGIONS_RECOVERY_INTERVAL,
HConstants.DEFAULT_REGIONS_RECOVERY_INTERVAL);
}
+ @RestrictedApi(explanation = "Only visible for testing", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ RegionsRecoveryChore getChore() {
+ return chore;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 8977174..f91f040 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -936,7 +936,7 @@ public class ServerManager {
*/
public void stop() {
if (flushedSeqIdFlusher != null) {
- flushedSeqIdFlusher.cancel();
+ flushedSeqIdFlusher.shutdown();
}
if (persistFlushedSequenceId) {
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index f628841..186a8ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -456,7 +456,7 @@ public class SplitLogManager {
choreService.shutdown();
}
if (timeoutMonitor != null) {
- timeoutMonitor.cancel(true);
+ timeoutMonitor.shutdown(true);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index bd1bff1..f2d88ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -102,7 +102,7 @@ public class QuotaCache implements Stoppable {
public void stop(final String why) {
if (refreshChore != null) {
LOG.debug("Stopping QuotaRefresherChore chore.");
- refreshChore.cancel(true);
+ refreshChore.shutdown(true);
}
stopped = true;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
index 81e7e87..282075b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -98,11 +98,11 @@ public class RegionServerSpaceQuotaManager {
public synchronized void stop() {
if (spaceQuotaRefresher != null) {
- spaceQuotaRefresher.cancel();
+ spaceQuotaRefresher.shutdown();
spaceQuotaRefresher = null;
}
if (regionSizeReporter != null) {
- regionSizeReporter.cancel();
+ regionSizeReporter.shutdown();
regionSizeReporter = null;
}
started = false;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index bcb1436..e40e251 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2642,6 +2642,11 @@ public class HRegionServer extends Thread implements
}
}
+ protected final void shutdownChore(ScheduledChore chore) {
+ if (chore != null) {
+ chore.shutdown();
+ }
+ }
/**
* Wait on all threads to finish. Presumption is that all closes and stops
* have already been called.
@@ -2649,15 +2654,16 @@ public class HRegionServer extends Thread implements
protected void stopServiceThreads() {
// clean up the scheduled chores
if (this.choreService != null) {
- choreService.cancelChore(nonceManagerChore);
- choreService.cancelChore(compactionChecker);
- choreService.cancelChore(periodicFlusher);
- choreService.cancelChore(healthCheckChore);
- choreService.cancelChore(executorStatusChore);
- choreService.cancelChore(storefileRefresher);
- choreService.cancelChore(fsUtilizationChore);
- choreService.cancelChore(slowLogTableOpsChore);
- // clean up the remaining scheduled chores (in case we missed out any)
+ shutdownChore(nonceManagerChore);
+ shutdownChore(compactionChecker);
+ shutdownChore(periodicFlusher);
+ shutdownChore(healthCheckChore);
+ shutdownChore(executorStatusChore);
+ shutdownChore(storefileRefresher);
+ shutdownChore(fsUtilizationChore);
+ shutdownChore(slowLogTableOpsChore);
+ // cancel the remaining scheduled chores (in case we missed out any)
+ // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
choreService.shutdown();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 1f831ee..342ec18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -216,7 +216,7 @@ public class HeapMemoryManager {
public void stop() {
// The thread is Daemon. Just interrupting the ongoing process.
LOG.info("Stopping");
- this.heapMemTunerChore.cancel(true);
+ this.heapMemTunerChore.shutdown(true);
}
public void registerTuneObserver(HeapMemoryTuneObserver observer) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java
index d29e061..6819e5d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java
@@ -18,18 +18,18 @@
package org.apache.hadoop.hbase.master;
-import java.io.IOException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -38,7 +38,7 @@ import org.junit.experimental.categories.Category;
/**
* Test for Regions Recovery Config Manager
*/
-@Category({MasterTests.class, MediumTests.class})
+@Category({ MasterTests.class, MediumTests.class })
public class TestRegionsRecoveryConfigManager {
@ClassRule
@@ -51,8 +51,6 @@ public class TestRegionsRecoveryConfigManager {
private HMaster hMaster;
- private RegionsRecoveryChore regionsRecoveryChore;
-
private RegionsRecoveryConfigManager regionsRecoveryConfigManager;
private Configuration conf;
@@ -62,10 +60,8 @@ public class TestRegionsRecoveryConfigManager {
conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset("hbase.regions.recovery.store.file.ref.count");
conf.unset("hbase.master.regions.recovery.check.interval");
- StartMiniClusterOption option = StartMiniClusterOption.builder()
- .masterClass(TestHMaster.class)
- .numRegionServers(1)
- .numDataNodes(1).build();
+ StartMiniClusterOption option = StartMiniClusterOption.builder().masterClass(TestHMaster.class)
+ .numRegionServers(1).numDataNodes(1).build();
HBASE_TESTING_UTILITY.startMiniCluster(option);
cluster = HBASE_TESTING_UTILITY.getMiniHBaseCluster();
}
@@ -77,44 +73,44 @@ public class TestRegionsRecoveryConfigManager {
@Test
public void testChoreSchedule() throws Exception {
-
this.hMaster = cluster.getMaster();
- Stoppable stoppable = new StoppableImplementation();
- this.regionsRecoveryChore = new RegionsRecoveryChore(stoppable, conf, hMaster);
-
this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this.hMaster);
// not yet scheduled
- Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertFalse(
+ hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// not yet scheduled
- Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertFalse(
+ hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
conf.setInt("hbase.master.regions.recovery.check.interval", 10);
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// not yet scheduled - missing config: hbase.regions.recovery.store.file.ref.count
- Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertFalse(
+ hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
conf.setInt("hbase.regions.recovery.store.file.ref.count", 10);
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// chore scheduled
- Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// chore re-scheduled
- Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
conf.setInt("hbase.regions.recovery.store.file.ref.count", 20);
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// chore scheduling untouched
- Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertTrue(hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
conf.unset("hbase.regions.recovery.store.file.ref.count");
this.regionsRecoveryConfigManager.onConfigurationChange(conf);
// chore un-scheduled
- Assert.assertFalse(hMaster.getChoreService().isChoreScheduled(regionsRecoveryChore));
+ assertFalse(
+ hMaster.getChoreService().isChoreScheduled(regionsRecoveryConfigManager.getChore()));
}
// Make it public so that JVMClusterUtil can access it.
@@ -123,24 +119,4 @@ public class TestRegionsRecoveryConfigManager {
super(conf);
}
}
-
- /**
- * Simple helper class that just keeps track of whether or not its stopped.
- */
- private static class StoppableImplementation implements Stoppable {
-
- private boolean stop = false;
-
- @Override
- public void stop(String why) {
- this.stop = true;
- }
-
- @Override
- public boolean isStopped() {
- return this.stop;
- }
-
- }
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java
index 2522768..b1fdf26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/janitor/TestCatalogJanitor.java
@@ -111,7 +111,7 @@ public class TestCatalogJanitor {
@After
public void teardown() {
- this.janitor.cancel(true);
+ this.janitor.shutdown(true);
this.masterServices.stop("DONE");
}