You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/12 06:24:51 UTC
[2/6] accumulo git commit: ACCUMULO-3509: Make cleanup stateful to
minimize blocking
ACCUMULO-3509: Make cleanup stateful to minimize blocking
By enabling state ( true/false) within the cleanup method, the change will avoid blocking
on a scan session being swept. if the session cleanup blocks because a ScanSession is
still being read, we may block until the ScanBatch returns for that ScanSession.
The change uses a simple semaphore ( purely because I like the word ) to attempt acquisition.
If that fails, we return false from the cleanup and reintroduce that Session back into
the queue to clean up.
Closes apache/accumulo#62
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/46ad8368
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/46ad8368
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/46ad8368
Branch: refs/heads/1.7
Commit: 46ad8368e160c56c03571b467f8ae603c50992f6
Parents: 567f52f
Author: phrocker <ma...@gmail.com>
Authored: Mon Jan 4 10:59:28 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jan 11 21:24:05 2016 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 6 +-
.../org/apache/accumulo/tserver/Tablet.java | 67 +++++--
.../apache/accumulo/tserver/TabletServer.java | 81 +++++++--
.../test/functional/ScanSessionTimeOutIT.java | 15 +-
.../test/functional/SessionBlockVerifyIT.java | 176 +++++++++++++++++++
5 files changed, 305 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 632bb59..9243494 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -225,7 +225,11 @@ public enum Property {
+ " tserver.walog.max.size >= this property."),
TSERV_MEM_MGMT("tserver.memory.manager", "org.apache.accumulo.server.tabletserver.LargestFirstMemoryManager", PropertyType.CLASSNAME,
"An implementation of MemoryManger that accumulo will use."),
- TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "maximum idle time for a session"),
+ TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "When a tablet server's SimpleTimer thread triggers to check "
+ + "idle sessions, this configurable option will be used to evaluate scan sessions to determine if they can be closed due to inactivity"),
+ TSERV_UPDATE_SESSION_MAXIDLE("tserver.session.update.idle.max", "1m", PropertyType.TIMEDURATION,
+ "When a tablet server's SimpleTimer thread triggers to check "
+ + "idle sessions, this configurable option will be used to evaluate update sessions to determine if they can be closed due to inactivity"),
TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
"The maximum number of concurrent read ahead that will execute. This effectively"
+ " limits the number of long running scans that can run concurrently per tserver."),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index efed665..3f00c0b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -39,6 +39,8 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -1761,33 +1763,47 @@ public class Tablet {
private ScanDataSource isolatedDataSource;
private boolean sawException = false;
private boolean scanClosed = false;
+ /**
+ * A fair semaphore of one is used since explicitly know the access pattern will be one thread to read and another to call close if the session becomes
+ * idle. Since we're explicitly preventing re-entrance, we're currently using a Sempahore. If at any point we decide read needs to be re-entrant, we can
+ * switch to a Reentrant lock.
+ */
+ private Semaphore scannerSemaphore;
Scanner(Range range, ScanOptions options) {
this.range = range;
this.options = options;
+ scannerSemaphore = new Semaphore(1, true);
}
- synchronized ScanBatch read() throws IOException, TabletClosedException {
+ ScanBatch read() throws IOException, TabletClosedException {
- if (sawException)
- throw new IllegalStateException("Tried to use scanner after exception occurred.");
-
- if (scanClosed)
- throw new IllegalStateException("Tried to use scanner after it was closed.");
+ ScanDataSource dataSource = null;
Batch results = null;
- ScanDataSource dataSource;
+ try {
- if (options.isolated) {
- if (isolatedDataSource == null)
- isolatedDataSource = new ScanDataSource(options);
- dataSource = isolatedDataSource;
- } else {
- dataSource = new ScanDataSource(options);
- }
+ try {
+ scannerSemaphore.acquire();
+ } catch (InterruptedException e) {
+ sawException = true;
+ }
- try {
+ // sawException may have occurred within close, so we cannot assume that an interrupted exception was its cause
+ if (sawException)
+ throw new IllegalStateException("Tried to use scanner after exception occurred.");
+
+ if (scanClosed)
+ throw new IllegalStateException("Tried to use scanner after it was closed.");
+
+ if (options.isolated) {
+ if (isolatedDataSource == null)
+ isolatedDataSource = new ScanDataSource(options);
+ dataSource = isolatedDataSource;
+ } else {
+ dataSource = new ScanDataSource(options);
+ }
SortedKeyValueIterator<Key,Value> iter;
@@ -1834,9 +1850,9 @@ public class Tablet {
} finally {
// code in finally block because always want
// to return mapfiles, even when exception is thrown
- if (!options.isolated)
+ if (null != dataSource && !options.isolated)
dataSource.close(false);
- else if (dataSource.fileManager != null)
+ else if (null != dataSource && dataSource.fileManager != null)
dataSource.fileManager.detach();
synchronized (Tablet.this) {
@@ -1846,19 +1862,32 @@ public class Tablet {
queryBytes += results.numBytes;
}
}
+
+ scannerSemaphore.release();
}
}
// close and read are synchronized because can not call close on the data source while it is in use
// this cloud lead to the case where file iterators that are in use by a thread are returned
// to the pool... this would be bad
- void close() {
+ boolean close() {
options.interruptFlag.set(true);
- synchronized (this) {
+ boolean obtainedLock = false;
+ try {
+ obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
+ if (!obtainedLock)
+ return false;
+
scanClosed = true;
if (isolatedDataSource != null)
isolatedDataSource.close(false);
+ } catch (InterruptedException e) {
+ return false;
+ } finally {
+ if (obtainedLock)
+ scannerSemaphore.release();
}
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 29cf0d3..b7aaf06 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -237,6 +237,8 @@ import org.apache.thrift.server.TServer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
enum ScanRunState {
@@ -386,25 +388,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
String client = TServerUtils.clientAddress.get();
public boolean reserved;
- public void cleanup() {}
+ public boolean cleanup() {
+ return true;
+ }
}
private static class SessionManager {
SecureRandom random;
Map<Long,Session> sessions;
- long maxIdle;
+ private long maxIdle;
+ private long maxUpdateIdle;
+ private List<Session> idleSessions = new ArrayList<Session>();
+ private final Long expiredSessionMarker = new Long(-1);
SessionManager(AccumuloConfiguration conf) {
random = new SecureRandom();
sessions = new HashMap<Long,Session>();
-
+ maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
Runnable r = new Runnable() {
@Override
public void run() {
- sweep(maxIdle);
+ sweep(maxIdle, maxUpdateIdle);
}
};
@@ -506,14 +513,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return session;
}
- private void sweep(long maxIdle) {
+ private void sweep(final long maxIdle, final long maxUpdateIdle) {
ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
synchronized (this) {
Iterator<Session> iter = sessions.values().iterator();
while (iter.hasNext()) {
Session session = iter.next();
+ long configuredIdle = maxIdle;
+ if (session instanceof UpdateSession) {
+ configuredIdle = maxUpdateIdle;
+ }
long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > maxIdle && !session.reserved) {
+ if (idleTime > configuredIdle && !session.reserved) {
log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms");
iter.remove();
sessionsToCleanup.add(session);
@@ -521,10 +532,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
- // do clean up outside of lock
- for (Session session : sessionsToCleanup) {
- session.cleanup();
+ // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list
+
+ synchronized (idleSessions) {
+
+ sessionsToCleanup.addAll(idleSessions);
+
+ idleSessions.clear();
+
+ // perform cleanup for all of the sessions
+ for (Session session : sessionsToCleanup) {
+ if (!session.cleanup())
+ idleSessions.add(session);
+ }
}
+
}
synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
@@ -556,7 +578,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
- for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
+
+ synchronized (idleSessions) {
+ /**
+ * Add sessions so that get the list returned in the active scans call
+ */
+ for (Session session : idleSessions) {
+ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
+ }
+ }
+
+ for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
Session session = entry.getValue();
@SuppressWarnings("rawtypes")
@@ -595,11 +628,20 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public synchronized List<ActiveScan> getActiveScans() {
- ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+ final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+ final long ct = System.currentTimeMillis();
+ final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
- long ct = System.currentTimeMillis();
+ synchronized (idleSessions) {
+ /**
+ * Add sessions so that get the list returned in the active scans call
+ */
+ for (Session session : idleSessions) {
+ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
+ }
+ }
- for (Entry<Long,Session> entry : sessions.entrySet()) {
+ for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
Session session = entry.getValue();
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
@@ -841,8 +883,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public AtomicBoolean interruptFlag;
@Override
- public void cleanup() {
+ public boolean cleanup() {
interruptFlag.set(true);
+ return true;
}
}
@@ -879,13 +922,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
@Override
- public void cleanup() {
+ public boolean cleanup() {
try {
if (nextBatchTask != null)
nextBatchTask.cancel(true);
} finally {
if (scanner != null)
- scanner.close();
+ return scanner.close();
+ else
+ return true;
}
}
@@ -908,9 +953,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public KeyExtent threadPoolExtent;
@Override
- public void cleanup() {
+ public boolean cleanup() {
if (lookupTask != null)
lookupTask.cancel(true);
+ // the cancellation should provide us the safety to return true here
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 91fc9eb..6009462 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@ -49,7 +49,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), "3"));
+ cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString()));
}
@Override
@@ -63,12 +63,21 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
public void reduceSessionIdle() throws Exception {
InstanceOperations ops = getConnector().instanceOperations();
sessionIdle = ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey());
- ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
+ ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
log.info("Waiting for existing session idle time to expire");
Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle));
log.info("Finished waiting");
}
+ /**
+ * Returns the max idle time as a string.
+ *
+ * @return new max idle time
+ */
+ protected String getMaxIdleTimeString() {
+ return "3";
+ }
+
@After
public void resetSessionIdle() throws Exception {
if (null != sessionIdle) {
@@ -108,7 +117,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
}
- private void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception {
+ protected void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception {
for (int i = start; i < stop; i++) {
Text er = new Text(String.format("%08d", i));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
new file mode 100644
index 0000000..05f304b
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Verify that we have resolved blocking issue by ensuring that we have not lost scan sessions which we know to currently be running
+ */
+public class SessionBlockVerifyIT extends ScanSessionTimeOutIT {
+ private static final Logger log = LoggerFactory.getLogger(SessionBlockVerifyIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ cfg.setNumTservers(1);
+ siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
+ siteConfig.put(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "11");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
+ protected String getMaxIdleTimeString() {
+ return "1s";
+ }
+
+ ExecutorService service = Executors.newFixedThreadPool(10);
+
+ @Test
+ public void run() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (int i = 0; i < 1000; i++) {
+ Mutation m = new Mutation(new Text(String.format("%08d", i)));
+ for (int j = 0; j < 3; j++)
+ m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ Scanner scanner = c.createScanner(tableName, new Authorizations());
+ scanner.setReadaheadThreshold(20000);
+ scanner.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000)));
+
+ // test by making a slow iterator and then a couple of fast ones.
+ // when then checking we shouldn't have any running except the slow iterator
+ IteratorSetting setting = new IteratorSetting(21, SlowIterator.class);
+ SlowIterator.setSeekSleepTime(setting, Long.MAX_VALUE);
+ SlowIterator.setSleepTime(setting, Long.MAX_VALUE);
+ scanner.addScanIterator(setting);
+
+ final Iterator<Entry<Key,Value>> slow = scanner.iterator();
+
+ final List<Future<Boolean>> callables = new ArrayList<Future<Boolean>>();
+ final CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ Future<Boolean> callable = service.submit(new Callable<Boolean>() {
+ public Boolean call() {
+ latch.countDown();
+ while (slow.hasNext()) {
+
+ slow.next();
+ }
+ return slow.hasNext();
+ }
+ });
+ callables.add(callable);
+ }
+
+ latch.await();
+
+ log.info("Starting SessionBlockVerifyIT");
+
+ // let's add more for good measure.
+ for (int i = 0; i < 2; i++) {
+ Scanner scanner2 = c.createScanner(tableName, new Authorizations());
+
+ scanner2.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000)));
+
+ scanner2.setBatchSize(1);
+ Iterator<Entry<Key,Value>> iter = scanner2.iterator();
+ // call super's verify mechanism
+ verify(iter, 0, 1000);
+
+ }
+
+ int sessionsFound = 0;
+ // we have configured 1 tserver, so we can grab the one and only
+ String tserver = Iterables.getOnlyElement(c.instanceOperations().getTabletServers());
+
+ final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tserver);
+
+ for (ActiveScan scan : scans) {
+ // only here to minimize chance of seeing meta extent scans
+
+ if (tableName.equals(scan.getTable()) && scan.getSsiList().size() > 0) {
+ assertEquals("Not the expected iterator", 1, scan.getSsiList().size());
+ assertTrue("Not the expected iterator", scan.getSsiList().iterator().next().contains("SlowIterator"));
+ sessionsFound++;
+ }
+
+ }
+
+ /**
+ * The message below indicates the problem that we experience within ACCUMULO-3509. The issue manifests as a blockage in the Scanner synchronization that
+ * prevent us from making the close call against it. Since the close blocks until a read is finished, we ultimately have a block within the sweep of
+ * SessionManager. As a result never reap subsequent idle sessions AND we will orphan the sessionsToCleanup in the sweep, leading to an inaccurate count
+ * within sessionsFound.
+ */
+ assertEquals("Must have ten sessions. Failure indicates a synchronization block within the sweep mechanism", 10, sessionsFound);
+ for (Future<Boolean> callable : callables) {
+ callable.cancel(true);
+ }
+ service.shutdown();
+ }
+
+}